Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle duplicate document view insertions #410

Merged
merged 5 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Sort paginated query field rows by document view id [#354](https://github.com/p2panda/aquadoggo/pull/354)
- Fix missing field when filtering owner [#359](https://github.com/p2panda/aquadoggo/pull/359)
- Bind untrusted user filter arguments in SQL query [#358](https://github.com/p2panda/aquadoggo/pull/358)
- Handle duplicate document view insertions in reduce task [#410](https://github.com/p2panda/aquadoggo/pull/410)

## [0.4.0]

Expand Down
1 change: 0 additions & 1 deletion aquadoggo/src/db/stores/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ async fn insert_document_view(
)
VALUES
($1, $2, $3)
ON CONFLICT(document_view_id) DO NOTHING -- @TODO: temp fix for double document view insertions: https://github.com/p2panda/aquadoggo/issues/398
",
)
.bind(document_view.id().to_string())
Expand Down
49 changes: 49 additions & 0 deletions aquadoggo/src/materializer/tasks/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ use crate::materializer::TaskInput;
pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult<TaskInput> {
debug!("Working on {}", input);

// If this task is concerned with a document view then we can first check if it has actually
// already been materialized. If so, we exit this task immediately and return no new tasks.
if let Some(document_view_id) = &input.document_view_id {
let document_view_exists = context
.store
.get_document_by_view_id(document_view_id)
.await
.map_err(|err| TaskError::Critical(err.to_string()))?
.is_some();

if document_view_exists {
return Ok(None);
}
}

// Find out which document we are handling
let document_id = match resolve_document_id(&context, &input).await? {
Some(document_id) => Ok(document_id),
Expand Down Expand Up @@ -513,4 +528,38 @@ mod tests {
assert!(reduce_task(node.context.clone(), input).await.is_ok());
});
}

#[rstest]
fn duplicate_document_view_insertions(
#[from(populate_store_config)]
#[with(2, 1, 1)]
config: PopulateStoreConfig,
) {
test_runner(|node: TestNode| async move {
// Populate the store with some entries and operations but DON'T materialise any resulting documents.
let (_, document_ids) = populate_store(&node.context.store, &config).await;
let document_id = document_ids.get(0).expect("At least one document id");

// Get the operations and build the document.
let operations = node
.context
.store
.get_operations_by_document_id(document_id)
.await
.unwrap();

// Build the document from the operations.
let document_builder = DocumentBuilder::from(&operations);
let document = document_builder.build().unwrap();

// Issue a reduce task for the document, which also inserts the current view.
let input = TaskInput::new(Some(document_id.to_owned()), None);
assert!(reduce_task(node.context.clone(), input).await.is_ok());

// Issue a reduce task for the document view, which should succeed although no new
// view is inserted.
let input = TaskInput::new(None, Some(document.view_id().to_owned()));
assert!(reduce_task(node.context.clone(), input).await.is_ok());
})
}
}