diff --git a/src/repository/default.rs b/src/repository/default.rs index 3440af48..84ad5fcd 100644 --- a/src/repository/default.rs +++ b/src/repository/default.rs @@ -137,12 +137,16 @@ impl Repository for $repo { } async fn create_database(&self, database_name: &str) -> Result { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; + let id = sqlx::query(r#"INSERT INTO database (name) VALUES ($1) RETURNING (id)"#) .bind(database_name) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)? .try_get("id").map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; + Ok(id) } @@ -150,11 +154,15 @@ impl Repository for $repo { &self, name: &str, ) -> Result { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; + let database = sqlx::query_as(r#"SELECT id, name FROM database WHERE database.name = $1"#) .bind(name) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; + Ok(database) } @@ -163,6 +171,8 @@ impl Repository for $repo { database_name: &str, collection_name: &str, ) -> Result { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; + let collection = sqlx::query_as( r#" SELECT collection.id, database.id AS database_id, collection.name @@ -172,9 +182,11 @@ impl Repository for $repo { ) .bind(database_name) .bind(collection_name) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; + Ok(collection) } @@ -184,6 +196,8 @@ impl Repository for $repo { collection_name: &str, table_name: &str, ) -> Result { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; + let table = sqlx::query_as( r#" SELECT "table".id, collection.id as collection_id, "table".name @@ -196,9 +210,11 @@ impl Repository for $repo { .bind(database_name) .bind(collection_name) .bind(table_name) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; + Ok(table) } @@ -207,13 +223,17 @@ impl Repository for $repo { database_id: DatabaseId, collection_name: &str, ) -> Result { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; + let id = sqlx::query( r#"INSERT INTO "collection" (database_id, name) VALUES ($1, $2) RETURNING (id)"#, ).bind(database_id).bind(collection_name) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)? .try_get("id").map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; + Ok(id) } @@ -224,6 +244,8 @@ impl Repository for $repo { schema: &Schema, uuid: Uuid, ) -> Result<(TableId, TableVersionId), Error> { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; + // Create new (empty) table let new_table_id: i64 = sqlx::query( r#"INSERT INTO "table" (collection_id, name, uuid) VALUES ($1, $2, $3) RETURNING (id)"#, @@ -231,7 +253,7 @@ impl Repository for $repo { .bind(collection_id) .bind(table_name) .bind(uuid) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)? .try_get("id").map_err($repo::interpret_error)?; @@ -240,10 +262,12 @@ impl Repository for $repo { r#"INSERT INTO table_version (table_id) VALUES ($1) RETURNING (id)"#, ) .bind(new_table_id) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)? .try_get("id").map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; + // Create columns // TODO this breaks if we have more than (bind limit) columns if !schema.fields().is_empty() { @@ -289,13 +313,15 @@ impl Repository for $repo { uuid: Uuid, version: i64, ) -> Result { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; + // For now we only support linear history let last_version_id: TableVersionId = sqlx::query(r#"SELECT max(table_version.id) AS id FROM table_version JOIN "table" ON table_version.table_id = "table".id WHERE "table".uuid = $1"#) .bind(uuid) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)? .try_get("id").map_err($repo::interpret_error)?; @@ -306,10 +332,12 @@ impl Repository for $repo { ) .bind(version) .bind(last_version_id) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)? .try_get("id").map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; + sqlx::query( "INSERT INTO table_column (table_version_id, name, type) SELECT $2, name, type FROM table_column WHERE table_version_id = $1;", @@ -374,6 +402,7 @@ impl Repository for $repo { new_table_name: &str, new_collection_id: Option, ) -> Result<(), Error> { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; // Do RETURNING(id) here and ask for the ID back with fetch_one() to force a // row not found error if the table doesn't exist let query = if let Some(new_collection_id) = new_collection_id { @@ -381,7 +410,9 @@ impl Repository for $repo { } else { sqlx::query("UPDATE \"table\" SET name = $1 WHERE id = $2 RETURNING id").bind(new_table_name).bind(table_id) }; - query.fetch_one(&self.executor).await.map_err($repo::interpret_error)?; + query.fetch_one(&mut *tx).await.map_err($repo::interpret_error)?; + + tx.commit().await.map_err($repo::interpret_error)?; Ok(()) } @@ -411,6 +442,8 @@ impl Repository for $repo { } ); + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; + let new_function_id: i64 = sqlx::query(query.as_str()) .bind(database_id) .bind(function_name) @@ -420,10 +453,12 @@ impl Repository for $repo { .bind(details.return_type.to_string()) .bind(details.data.clone()) .bind(details.volatility.to_string()) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)? .try_get("id").map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; + Ok(new_function_id) } @@ -431,6 +466,8 @@ impl Repository for $repo { &self, database_id: DatabaseId, ) -> Result, Error> { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; + let functions = sqlx::query_as( r#" SELECT @@ -447,9 +484,11 @@ impl Repository for $repo { WHERE database_id = $1; "#) .bind(database_id) - .fetch_all(&self.executor) + .fetch_all(&mut *tx) .await.map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; + Ok(functions) } @@ -469,15 +508,19 @@ impl Repository for $repo { func_names.iter().map(|_| "$2").collect::>().join(", ") ); + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; + let mut query_builder = sqlx::query(&query).bind(database_id); for func_name in func_names { query_builder = query_builder.bind(func_name); } query_builder - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await .map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; + Ok(()) } @@ -486,26 +529,32 @@ impl Repository for $repo { // In these methods, return the ID back so that we get an error if the // table/collection/schema didn't actually exist async fn delete_table(&self, table_id: TableId) -> Result<(), Error> { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; sqlx::query("DELETE FROM \"table\" WHERE id = $1 RETURNING id") .bind(table_id) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; Ok(()) } async fn delete_collection(&self, collection_id: CollectionId) -> Result<(), Error> { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; sqlx::query("DELETE FROM collection WHERE id = $1 RETURNING id") .bind(collection_id) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; Ok(()) } async fn delete_database(&self, database_id: DatabaseId) -> Result<(), Error> { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; sqlx::query("DELETE FROM database WHERE id = $1 RETURNING id") .bind(database_id) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; Ok(()) } @@ -539,19 +588,23 @@ impl Repository for $repo { } async fn update_dropped_table(&self, uuid: Uuid, deletion_status: DroppedTableDeletionStatus) -> Result<(), Error> { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; sqlx::query("UPDATE dropped_table SET deletion_status = $1 WHERE uuid = $2 RETURNING uuid") .bind(deletion_status) .bind(uuid) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; Ok(()) } async fn delete_dropped_table(&self, uuid: Uuid) -> Result<(), Error> { + let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?; sqlx::query("DELETE FROM dropped_table WHERE uuid = $1 RETURNING uuid") .bind(uuid) - .fetch_one(&self.executor) + .fetch_one(&mut *tx) .await.map_err($repo::interpret_error)?; + tx.commit().await.map_err($repo::interpret_error)?; Ok(()) } }