Skip to content

Commit

Permalink
Workaround SQLite driver bug on fetch_one
Browse files Browse the repository at this point in the history
  • Loading branch information
lizardoluis committed Sep 6, 2024
1 parent bafff1b commit 8a47781
Showing 1 changed file with 71 additions and 18 deletions.
89 changes: 71 additions & 18 deletions src/repository/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,32 @@ impl Repository for $repo {
}

async fn create_database(&self, database_name: &str) -> Result<DatabaseId, Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;

let id = sqlx::query(r#"INSERT INTO database (name) VALUES ($1) RETURNING (id)"#)
.bind(database_name)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?
.try_get("id").map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;

Ok(id)
}

async fn get_database(
&self,
name: &str,
) -> Result<DatabaseRecord, Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;

let database = sqlx::query_as(r#"SELECT id, name FROM database WHERE database.name = $1"#)
.bind(name)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;

Ok(database)
}

Expand All @@ -163,6 +171,8 @@ impl Repository for $repo {
database_name: &str,
collection_name: &str,
) -> Result<CollectionRecord, Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;

let collection = sqlx::query_as(
r#"
SELECT collection.id, database.id AS database_id, collection.name
Expand All @@ -172,9 +182,11 @@ impl Repository for $repo {
)
.bind(database_name)
.bind(collection_name)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;

Ok(collection)
}

Expand All @@ -184,6 +196,8 @@ impl Repository for $repo {
collection_name: &str,
table_name: &str,
) -> Result<TableRecord, Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;

let table = sqlx::query_as(
r#"
SELECT "table".id, collection.id as collection_id, "table".name
Expand All @@ -196,9 +210,11 @@ impl Repository for $repo {
.bind(database_name)
.bind(collection_name)
.bind(table_name)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;

Ok(table)
}

Expand All @@ -207,13 +223,17 @@ impl Repository for $repo {
database_id: DatabaseId,
collection_name: &str,
) -> Result<CollectionId, Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;

let id = sqlx::query(
r#"INSERT INTO "collection" (database_id, name) VALUES ($1, $2) RETURNING (id)"#,
).bind(database_id).bind(collection_name)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?
.try_get("id").map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;

Ok(id)
}

Expand All @@ -224,14 +244,16 @@ impl Repository for $repo {
schema: &Schema,
uuid: Uuid,
) -> Result<(TableId, TableVersionId), Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;

// Create new (empty) table
let new_table_id: i64 = sqlx::query(
r#"INSERT INTO "table" (collection_id, name, uuid) VALUES ($1, $2, $3) RETURNING (id)"#,
)
.bind(collection_id)
.bind(table_name)
.bind(uuid)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?
.try_get("id").map_err($repo::interpret_error)?;

Expand All @@ -240,10 +262,12 @@ impl Repository for $repo {
r#"INSERT INTO table_version (table_id) VALUES ($1) RETURNING (id)"#,
)
.bind(new_table_id)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?
.try_get("id").map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;

// Create columns
// TODO this breaks if we have more than (bind limit) columns
if !schema.fields().is_empty() {
Expand Down Expand Up @@ -289,13 +313,15 @@ impl Repository for $repo {
uuid: Uuid,
version: i64,
) -> Result<TableVersionId, Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;

// For now we only support linear history
let last_version_id: TableVersionId = sqlx::query(r#"SELECT max(table_version.id) AS id
FROM table_version
JOIN "table" ON table_version.table_id = "table".id
WHERE "table".uuid = $1"#)
.bind(uuid)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?
.try_get("id").map_err($repo::interpret_error)?;

Expand All @@ -306,10 +332,12 @@ impl Repository for $repo {
)
.bind(version)
.bind(last_version_id)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?
.try_get("id").map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;

sqlx::query(
"INSERT INTO table_column (table_version_id, name, type)
SELECT $2, name, type FROM table_column WHERE table_version_id = $1;",
Expand Down Expand Up @@ -374,14 +402,17 @@ impl Repository for $repo {
new_table_name: &str,
new_collection_id: Option<CollectionId>,
) -> Result<(), Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;
// Do RETURNING(id) here and ask for the ID back with fetch_one() to force a
// row not found error if the table doesn't exist
let query = if let Some(new_collection_id) = new_collection_id {
sqlx::query("UPDATE \"table\" SET name = $1, collection_id = $2 WHERE id = $3 RETURNING id").bind(new_table_name).bind(new_collection_id).bind(table_id)
} else {
sqlx::query("UPDATE \"table\" SET name = $1 WHERE id = $2 RETURNING id").bind(new_table_name).bind(table_id)
};
query.fetch_one(&self.executor).await.map_err($repo::interpret_error)?;
query.fetch_one(&mut *tx).await.map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;
Ok(())
}

Expand Down Expand Up @@ -411,6 +442,8 @@ impl Repository for $repo {
}
);

let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;

let new_function_id: i64 = sqlx::query(query.as_str())
.bind(database_id)
.bind(function_name)
Expand All @@ -420,17 +453,21 @@ impl Repository for $repo {
.bind(details.return_type.to_string())
.bind(details.data.clone())
.bind(details.volatility.to_string())
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?
.try_get("id").map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;

Ok(new_function_id)
}

async fn get_all_functions_in_database(
&self,
database_id: DatabaseId,
) -> Result<Vec<AllDatabaseFunctionsResult>, Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;

let functions = sqlx::query_as(
r#"
SELECT
Expand All @@ -447,9 +484,11 @@ impl Repository for $repo {
WHERE database_id = $1;
"#)
.bind(database_id)
.fetch_all(&self.executor)
.fetch_all(&mut *tx)
.await.map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;

Ok(functions)
}

Expand All @@ -469,15 +508,19 @@ impl Repository for $repo {
func_names.iter().map(|_| "$2").collect::<Vec<_>>().join(", ")
);

let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;

let mut query_builder = sqlx::query(&query).bind(database_id);
for func_name in func_names {
query_builder = query_builder.bind(func_name);
}
query_builder
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await
.map_err($repo::interpret_error)?;

tx.commit().await.map_err($repo::interpret_error)?;

Ok(())
}

Expand All @@ -486,26 +529,32 @@ impl Repository for $repo {
// In these methods, return the ID back so that we get an error if the
// table/collection/schema didn't actually exist
async fn delete_table(&self, table_id: TableId) -> Result<(), Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;
sqlx::query("DELETE FROM \"table\" WHERE id = $1 RETURNING id")
.bind(table_id)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?;
tx.commit().await.map_err($repo::interpret_error)?;
Ok(())
}

async fn delete_collection(&self, collection_id: CollectionId) -> Result<(), Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;
sqlx::query("DELETE FROM collection WHERE id = $1 RETURNING id")
.bind(collection_id)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?;
tx.commit().await.map_err($repo::interpret_error)?;
Ok(())
}

async fn delete_database(&self, database_id: DatabaseId) -> Result<(), Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;
sqlx::query("DELETE FROM database WHERE id = $1 RETURNING id")
.bind(database_id)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?;
tx.commit().await.map_err($repo::interpret_error)?;
Ok(())
}

Expand Down Expand Up @@ -539,19 +588,23 @@ impl Repository for $repo {
}

async fn update_dropped_table(&self, uuid: Uuid, deletion_status: DroppedTableDeletionStatus) -> Result<(), Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;
sqlx::query("UPDATE dropped_table SET deletion_status = $1 WHERE uuid = $2 RETURNING uuid")
.bind(deletion_status)
.bind(uuid)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?;
tx.commit().await.map_err($repo::interpret_error)?;
Ok(())
}

async fn delete_dropped_table(&self, uuid: Uuid) -> Result<(), Error> {
let mut tx = self.executor.begin().await.map_err($repo::interpret_error)?;
sqlx::query("DELETE FROM dropped_table WHERE uuid = $1 RETURNING uuid")
.bind(uuid)
.fetch_one(&self.executor)
.fetch_one(&mut *tx)
.await.map_err($repo::interpret_error)?;
tx.commit().await.map_err($repo::interpret_error)?;
Ok(())
}
}
Expand Down

0 comments on commit 8a47781

Please sign in to comment.