diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fc524f6b..0773e3a08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Sort paginated query field rows by document view id [#354](https://github.com/p2panda/aquadoggo/pull/354) - Fix missing field when filtering owner [#359](https://github.com/p2panda/aquadoggo/pull/359) - Bind untrusted user filter arguments in SQL query [#358](https://github.com/p2panda/aquadoggo/pull/358) +- Handle duplicate document view insertions in reduce task [#410](https://github.com/p2panda/aquadoggo/pull/410) ## [0.4.0] diff --git a/aquadoggo/src/db/stores/document.rs b/aquadoggo/src/db/stores/document.rs index 39dc6efd5..afce47dc1 100644 --- a/aquadoggo/src/db/stores/document.rs +++ b/aquadoggo/src/db/stores/document.rs @@ -459,7 +459,6 @@ async fn insert_document_view( ) VALUES ($1, $2, $3) - ON CONFLICT(document_view_id) DO NOTHING -- @TODO: temp fix for double document view insertions: https://github.com/p2panda/aquadoggo/issues/398 ", ) .bind(document_view.id().to_string()) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index db3ff12e1..1f750a686 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -36,6 +36,21 @@ use crate::materializer::TaskInput; pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { debug!("Working on {}", input); + // If this task is concerned with a document view then we can first check if it has actually + // already been materialized. If so, we exit this task immediately and return no new tasks. + if let Some(document_view_id) = &input.document_view_id { + let document_view_exists = context + .store + .get_document_by_view_id(document_view_id) + .await + .map_err(|err| TaskError::Critical(err.to_string()))? + .is_some(); + + if document_view_exists { + return Ok(None); + } + } + // Find out which document we are handling let document_id = match resolve_document_id(&context, &input).await? { Some(document_id) => Ok(document_id), @@ -513,4 +528,38 @@ mod tests { assert!(reduce_task(node.context.clone(), input).await.is_ok()); }); } + + #[rstest] + fn duplicate_document_view_insertions( + #[from(populate_store_config)] + #[with(2, 1, 1)] + config: PopulateStoreConfig, + ) { + test_runner(|node: TestNode| async move { + // Populate the store with some entries and operations but DON'T materialise any resulting documents. + let (_, document_ids) = populate_store(&node.context.store, &config).await; + let document_id = document_ids.get(0).expect("At least one document id"); + + // Get the operations and build the document. + let operations = node + .context + .store + .get_operations_by_document_id(document_id) + .await + .unwrap(); + + // Build the document from the operations. + let document_builder = DocumentBuilder::from(&operations); + let document = document_builder.build().unwrap(); + + // Issue a reduce task for the document, which also inserts the current view. + let input = TaskInput::new(Some(document_id.to_owned()), None); + assert!(reduce_task(node.context.clone(), input).await.is_ok()); + + // Issue a reduce task for the document view, which should succeed although no new + // view is inserted. + let input = TaskInput::new(None, Some(document.view_id().to_owned())); + assert!(reduce_task(node.context.clone(), input).await.is_ok()); + }) + } }