diff --git a/CHANGELOG.md b/CHANGELOG.md index 08a8dd46f..b4b66da76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Naive protocol replication [#380](https://github.com/p2panda/aquadoggo/pull/380) - Integrate replication manager with networking stack [#387](https://github.com/p2panda/aquadoggo/pull/387) 🥞 - Reverse lookup for pinned relations in dependency task [#434](https://github.com/p2panda/aquadoggo/pull/434) +- Persist and maintain index of operation's position in document [#438](https://github.com/p2panda/aquadoggo/pull/438) ### Changed diff --git a/aquadoggo/migrations/20230523135826_alter-operations.sql b/aquadoggo/migrations/20230523135826_alter-operations.sql new file mode 100644 index 000000000..a894d8117 --- /dev/null +++ b/aquadoggo/migrations/20230523135826_alter-operations.sql @@ -0,0 +1,3 @@ +-- SPDX-License-Identifier: AGPL-3.0-or-later + +ALTER TABLE operations_v1 ADD COLUMN sorted_index INT; \ No newline at end of file diff --git a/aquadoggo/src/db/models/operation.rs b/aquadoggo/src/db/models/operation.rs index 4f8c8656b..67d0f1b16 100644 --- a/aquadoggo/src/db/models/operation.rs +++ b/aquadoggo/src/db/models/operation.rs @@ -25,6 +25,12 @@ pub struct OperationRow { /// The previous operations of this operation concatenated into string format with `_` /// separator. pub previous: Option, + + /// Index of this operation once topological sorting of the operation graph has been performed. + /// + /// If this value is `None` we can assume the operation has not been processed yet and we are + /// waiting for the `reduce` task to complete materialization. + pub sorted_index: Option, } /// A struct representing a single operation field row as it is inserted in the database. @@ -99,4 +105,10 @@ pub struct OperationFieldsJoinedRow { /// This numeric value is a simple list index to represent multiple values within one operation /// field. pub list_index: Option, + + /// Index of this operation once topological sorting of the operation graph has been performed. + /// + /// If this value is `None` we can assume the operation has not been processed yet and we are + /// waiting for the `reduce` task to complete materialization. + pub sorted_index: Option, } diff --git a/aquadoggo/src/db/models/utils.rs b/aquadoggo/src/db/models/utils.rs index eb89aedea..f35dc9c54 100644 --- a/aquadoggo/src/db/models/utils.rs +++ b/aquadoggo/src/db/models/utils.rs @@ -35,6 +35,7 @@ pub fn parse_operation_rows( let public_key = PublicKey::new(&first_row.public_key).unwrap(); let operation_id = first_row.operation_id.parse().unwrap(); let document_id = first_row.document_id.parse().unwrap(); + let sorted_index = first_row.sorted_index; let mut relation_lists: BTreeMap> = BTreeMap::new(); let mut pinned_relation_lists: BTreeMap> = BTreeMap::new(); @@ -188,6 +189,7 @@ pub fn parse_operation_rows( previous: operation.previous(), fields: operation.fields(), public_key, + sorted_index, }; Some(operation) @@ -431,6 +433,7 @@ mod tests { field_type: Some("int".to_string()), value: Some("28".to_string()), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -449,6 +452,7 @@ mod tests { field_type: Some("float".to_string()), value: Some("3.5".to_string()), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -467,6 +471,7 @@ mod tests { field_type: Some("bool".to_string()), value: Some("false".to_string()), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -488,6 +493,7 @@ mod tests { .to_string(), ), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -509,6 +515,7 @@ mod tests { .to_string(), ), list_index: Some(1), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -530,6 +537,7 @@ mod tests { .to_string(), ), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -551,6 +559,7 @@ mod tests { .to_string(), ), list_index: Some(1), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -572,6 +581,7 @@ mod tests { .to_string(), ), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -593,6 +603,7 @@ mod tests { .to_string(), ), list_index: Some(1), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -614,6 +625,7 @@ mod tests { .to_string(), ), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -635,6 +647,7 @@ mod tests { .to_string(), ), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -653,6 +666,7 @@ mod tests { field_type: Some("str".to_string()), value: Some("bubu".to_string()), list_index: Some(0), + sorted_index: None, }, OperationFieldsJoinedRow { public_key: "2f8e50c2ede6d936ecc3144187ff1c273808185cfbc5ff3d3748d1ff7353fc96" @@ -671,6 +685,7 @@ mod tests { field_type: Some("pinned_relation_list".to_string()), value: None, list_index: Some(0), + sorted_index: None, }, ]; diff --git a/aquadoggo/src/db/stores/operation.rs b/aquadoggo/src/db/stores/operation.rs index d93a93e35..d70e438b1 100644 --- a/aquadoggo/src/db/stores/operation.rs +++ b/aquadoggo/src/db/stores/operation.rs @@ -1,6 +1,5 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use std::collections::BTreeMap; use std::fmt::Display; use async_trait::async_trait; @@ -12,7 +11,7 @@ use p2panda_rs::operation::{Operation, OperationId}; use p2panda_rs::schema::SchemaId; use p2panda_rs::storage_provider::error::OperationStorageError; use p2panda_rs::storage_provider::traits::OperationStore; -use sqlx::{query, query_as, query_scalar}; +use sqlx::{query, query_as, query_scalar, Any}; use crate::db::models::utils::{parse_operation_rows, parse_value_to_string_vec}; use crate::db::models::{DocumentViewFieldRow, OperationFieldsJoinedRow}; @@ -61,15 +60,12 @@ impl OperationStore for SqlStore { /// Insert an operation into storage. /// - /// This requires a DoggoOperation to be composed elsewhere, it contains an `PublicKey`, - /// `DocumentId`, `OperationId` and the actual `Operation` we want to store. + /// The `PublicKey` is determined by the author who created the operation, the `DocumentId` is + /// of the document this operation is part of and the `OperationId` is derived from the hash of + /// the `Entry` it was published with. /// - /// Returns a result containing `true` when one insertion occured, and false when no insertions - /// occured. Errors when a fatal storage error occurs. - /// - /// In aquadoggo we store an operation in the database in three different tables: `operations`, - /// `previous` and `operation_fields`. This means that this method actually makes 3 - /// different sets of insertions. + /// The `sorted_index` of the inserted operation will be set to `None` as this value is only + /// available once materialization completed. Use `update_operation_index` to set this value. async fn insert_operation( &self, id: &OperationId, @@ -77,93 +73,8 @@ impl OperationStore for SqlStore { operation: &Operation, document_id: &DocumentId, ) -> Result<(), OperationStorageError> { - // Start a transaction, any db insertions after this point, and before the `commit()` will - // be rolled back in the event of an error. - let mut tx = self - .pool - .begin() + self.insert_operation_with_index(id, public_key, operation, document_id, None) .await - .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - - // Construct query for inserting operation an row, execute it and check exactly one row was - // affected. - query( - " - INSERT INTO - operations_v1 ( - public_key, - document_id, - operation_id, - action, - schema_id, - previous - ) - VALUES - ($1, $2, $3, $4, $5, $6) - ", - ) - .bind(public_key.to_string()) - .bind(document_id.as_str()) - .bind(id.as_str()) - .bind(operation.action().as_str()) - .bind(operation.schema_id().to_string()) - .bind( - operation - .previous() - .map(|document_view_id| document_view_id.to_string()), - ) - .execute(&mut tx) - .await - .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - - let mut results = Vec::new(); - if let Some(fields) = operation.fields() { - for (name, value) in fields.iter() { - // If the value is a relation_list or pinned_relation_list we need to insert a new - // field row for every item in the list. Here we collect these items and return - // them in a vector. If this operation value is anything except for the above list - // types, we will return a vec containing a single item. - let db_values = parse_value_to_string_vec(value); - - for (index, db_value) in db_values.into_iter().enumerate() { - let cursor = OperationCursor::new(index, name, id); - - let result = query( - " - INSERT INTO - operation_fields_v1 ( - operation_id, - name, - field_type, - value, - list_index, - cursor - ) - VALUES - ($1, $2, $3, $4, $5, $6) - ", - ) - .bind(id.as_str().to_owned()) - .bind(name.to_owned()) - .bind(value.field_type().to_string()) - .bind(db_value) - .bind(index as i32) - .bind(cursor.to_string()) - .execute(&mut tx) - .await - .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - - results.push(result); - } - } - }; - - // Commit the transaction. - tx.commit() - .await - .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - - Ok(()) } /// Get an operation identified by it's `OperationId`. @@ -183,6 +94,7 @@ impl OperationStore for SqlStore { operations_v1.action, operations_v1.schema_id, operations_v1.previous, + operations_v1.sorted_index, operation_fields_v1.name, operation_fields_v1.field_type, operation_fields_v1.value, @@ -221,19 +133,22 @@ impl OperationStore for SqlStore { operations_v1.action, operations_v1.schema_id, operations_v1.previous, + operations_v1.sorted_index, operation_fields_v1.name, operation_fields_v1.field_type, operation_fields_v1.value, operation_fields_v1.list_index FROM operations_v1 - LEFT JOIN operation_fields_v1 - ON - operation_fields_v1.operation_id = operations_v1.operation_id + LEFT JOIN operation_fields_v1 + ON operation_fields_v1.operation_id = operations_v1.operation_id WHERE operations_v1.document_id = $1 ORDER BY - operation_fields_v1.list_index ASC + -- order the operations by their index when topologically sorted, in the case where + -- this may not be set yet we fall back to ordering by operation id. In both cases + -- we additionally order by list index. + operations_v1.sorted_index ASC, operations_v1.operation_id ASC, operation_fields_v1.list_index ASC ", ) .bind(id.as_str()) @@ -241,26 +156,11 @@ impl OperationStore for SqlStore { .await .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - let mut grouped_operation_rows: BTreeMap> = - BTreeMap::new(); - - for operation_row in operation_rows { - if let Some(current_operations) = - grouped_operation_rows.get_mut(&operation_row.operation_id) - { - current_operations.push(operation_row) - } else { - grouped_operation_rows - .insert(operation_row.clone().operation_id, vec![operation_row]); - }; + if operation_rows.is_empty() { + return Ok(vec![]); } - let operations: Vec = grouped_operation_rows - .iter() - .filter_map(|(_id, operation_rows)| parse_operation_rows(operation_rows.to_owned())) - .collect(); - - Ok(operations) + Ok(group_and_parse_operation_rows(operation_rows)) } /// Get all operations that are part of a given document. @@ -277,19 +177,19 @@ impl OperationStore for SqlStore { operations_v1.action, operations_v1.schema_id, operations_v1.previous, + operations_v1.sorted_index, operation_fields_v1.name, operation_fields_v1.field_type, operation_fields_v1.value, operation_fields_v1.list_index FROM operations_v1 - LEFT JOIN operation_fields_v1 - ON - operation_fields_v1.operation_id = operations_v1.operation_id + LEFT JOIN operation_fields_v1 + ON operation_fields_v1.operation_id = operations_v1.operation_id WHERE operations_v1.schema_id = $1 ORDER BY - operation_fields_v1.list_index ASC + operations_v1.operation_id ASC, operation_fields_v1.list_index ASC ", ) .bind(id.to_string()) @@ -297,26 +197,177 @@ impl OperationStore for SqlStore { .await .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; - let mut grouped_operation_rows: BTreeMap> = - BTreeMap::new(); - - for operation_row in operation_rows { - if let Some(current_operations) = - grouped_operation_rows.get_mut(&operation_row.operation_id) - { - current_operations.push(operation_row) - } else { - grouped_operation_rows - .insert(operation_row.clone().operation_id, vec![operation_row]); - }; + if operation_rows.is_empty() { + return Ok(vec![]); } - let operations: Vec = grouped_operation_rows - .iter() - .filter_map(|(_id, operation_rows)| parse_operation_rows(operation_rows.to_owned())) - .collect(); + Ok(group_and_parse_operation_rows(operation_rows)) + } +} + +/// Parse a collection of operation rows from multiple operations, into a list of `StorageOperation`. +/// +/// Expects the rows to be grouped by operation id. +fn group_and_parse_operation_rows( + operation_rows: Vec, +) -> Vec { + // We need to group all the operation rows so they can be parsed into operations. They come + // from the database ordered by their index once topologically sorted when present, otherwise + // by operation id. List items are additionally ordered by their list index. + let mut grouped_operation_rows = vec![]; + + let mut current_operation_id = operation_rows.first().unwrap().operation_id.clone(); + let mut current_operation_rows = vec![]; + + for row in operation_rows { + if row.operation_id == current_operation_id { + // If this row is part of the current operation push it to the current rows vec. + current_operation_rows.push(row); + } else { + // If we've moved on to the next operation, then push the complete vec of operation + // rows to the grouped rows collection and then setup for the next iteration. + grouped_operation_rows.push(current_operation_rows.clone()); + current_operation_id = row.operation_id.clone(); + current_operation_rows = vec![row]; + } + } + + // Push the final operation to the grouped rows. + grouped_operation_rows.push(current_operation_rows); + + // Parse all the operation rows into operations. + grouped_operation_rows + .into_iter() + .filter_map(parse_operation_rows) + .collect() +} + +impl SqlStore { + /// Update the sorted index of an operation. This method is used in `reduce` tasks as each + /// operation is processed. + pub async fn update_operation_index( + &self, + operation_id: &OperationId, + sorted_index: i32, + ) -> Result<(), OperationStorageError> { + query::( + " + UPDATE + operations_v1 + SET + sorted_index = $2 + WHERE + operation_id = $1 + ", + ) + .bind(operation_id.as_str()) + .bind(sorted_index) + .fetch_all(&self.pool) + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + Ok(()) + } - Ok(operations) + /// Insert an operation as well as the index for it's position in the document after + /// materialization has occurred. + async fn insert_operation_with_index( + &self, + id: &OperationId, + public_key: &PublicKey, + operation: &Operation, + document_id: &DocumentId, + sorted_index: Option, + ) -> Result<(), OperationStorageError> { + // Start a transaction, any db insertions after this point, and before the `commit()` will + // be rolled back in the event of an error. + let mut tx = self + .pool + .begin() + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + // Construct query for inserting operation an row, execute it and check exactly one row was + // affected. + query( + " + INSERT INTO + operations_v1 ( + public_key, + document_id, + operation_id, + action, + schema_id, + previous, + sorted_index + ) + VALUES + ($1, $2, $3, $4, $5, $6, $7) + ", + ) + .bind(public_key.to_string()) + .bind(document_id.as_str()) + .bind(id.as_str()) + .bind(operation.action().as_str()) + .bind(operation.schema_id().to_string()) + .bind( + operation + .previous() + .map(|document_view_id| document_view_id.to_string()), + ) + .bind(sorted_index) + .execute(&mut tx) + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + let mut results = Vec::new(); + if let Some(fields) = operation.fields() { + for (name, value) in fields.iter() { + // If the value is a relation_list or pinned_relation_list we need to insert a new + // field row for every item in the list. Here we collect these items and return + // them in a vector. If this operation value is anything except for the above list + // types, we will return a vec containing a single item. + let db_values = parse_value_to_string_vec(value); + + for (index, db_value) in db_values.into_iter().enumerate() { + let cursor = OperationCursor::new(index, name, id); + + let result = query( + " + INSERT INTO + operation_fields_v1 ( + operation_id, + name, + field_type, + value, + list_index, + cursor + ) + VALUES + ($1, $2, $3, $4, $5, $6) + ", + ) + .bind(id.as_str().to_owned()) + .bind(name.to_owned()) + .bind(value.field_type().to_string()) + .bind(db_value) + .bind(index as i32) + .bind(cursor.to_string()) + .execute(&mut tx) + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + results.push(result); + } + } + }; + + // Commit the transaction. + tx.commit() + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + Ok(()) } } @@ -359,7 +410,8 @@ impl From<&DocumentViewFieldRow> for OperationCursor { #[cfg(test)] mod tests { - use p2panda_rs::document::DocumentId; + use p2panda_rs::document::materialization::build_graph; + use p2panda_rs::document::{DocumentBuilder, DocumentId}; use p2panda_rs::identity::PublicKey; use p2panda_rs::operation::traits::{AsOperation, WithPublicKey}; use p2panda_rs::operation::{Operation, OperationAction, OperationBuilder, OperationId}; @@ -370,11 +422,13 @@ mod tests { document_id, operation, operation_id, operation_with_schema, public_key, random_document_view_id, random_operation_id, random_previous_operations, schema_id, }; - use p2panda_rs::test_utils::memory_store::helpers::{populate_store, PopulateStoreConfig}; + use p2panda_rs::test_utils::memory_store::helpers::PopulateStoreConfig; use p2panda_rs::WithId; use rstest::rstest; - use crate::test_utils::{doggo_fields, populate_store_config, test_runner, TestNode}; + use crate::test_utils::{ + doggo_fields, populate_and_materialize, populate_store_config, test_runner, TestNode, + }; use super::OperationCursor; @@ -531,9 +585,9 @@ mod tests { #[with(10, 1, 1)] config: PopulateStoreConfig, ) { - test_runner(|node: TestNode| async move { - // Populate the store with some entries and operations but DON'T materialise any resulting documents. - let (_, document_ids) = populate_store(&node.context.store, &config).await; + test_runner(|mut node: TestNode| async move { + // Populate the store with some entries and operations and materialize documents. + let (_, document_ids) = populate_and_materialize(&mut node, &config).await; let document_id = document_ids.get(0).expect("At least one document id"); let operations_by_document_id = node @@ -544,7 +598,14 @@ mod tests { .expect("Get operations by their document id"); // We expect the number of operations returned to match the expected number. - assert_eq!(operations_by_document_id.len(), 10) + assert_eq!(operations_by_document_id.len(), 10); + + // The operations should be in their topologically sorted order. + let operations = DocumentBuilder::from(&operations_by_document_id).operations(); + let expected_operation_order = + build_graph(&operations).unwrap().sort().unwrap().sorted(); + + assert_eq!(operations, expected_operation_order); }); } diff --git a/aquadoggo/src/db/types/operation.rs b/aquadoggo/src/db/types/operation.rs index 231a89239..2af007136 100644 --- a/aquadoggo/src/db/types/operation.rs +++ b/aquadoggo/src/db/types/operation.rs @@ -7,7 +7,7 @@ use p2panda_rs::operation::{OperationAction, OperationFields, OperationId, Opera use p2panda_rs::schema::SchemaId; use p2panda_rs::WithId; -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct StorageOperation { /// The id of the document this operation is part of. pub(crate) document_id: DocumentId, @@ -32,6 +32,12 @@ pub struct StorageOperation { /// The public key of the key pair used to publish this operation. pub(crate) public_key: PublicKey, + + /// Index for the position of this operation once topological sorting of the operation graph + /// has been performed. + /// + /// Is `None` when the operation has not been materialized into it's document yet. + pub(crate) sorted_index: Option, } impl WithPublicKey for StorageOperation { diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index b1be1ee73..aa69d0759 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -3,6 +3,7 @@ use std::convert::TryFrom; use log::{debug, info, trace}; +use p2panda_rs::document::materialization::build_graph; use p2panda_rs::document::traits::AsDocument; use p2panda_rs::document::{Document, DocumentBuilder, DocumentId, DocumentViewId}; use p2panda_rs::operation::traits::{AsOperation, WithPublicKey}; @@ -206,6 +207,20 @@ async fn reduce_document + WithPublicKey>( if document_view_exists { return Ok(None); + }; + + // @TODO: Make sorted operations available after building the document above so we can skip this step. + let operations = DocumentBuilder::from(operations).operations(); + let sorted_operations = build_graph(&operations).unwrap().sort().unwrap().sorted(); + + // Iterate over the sorted document operations and update their sorted index on the + // operations_v1 table. + for (index, (operation_id, _, _)) in sorted_operations.iter().enumerate() { + context + .store + .update_operation_index(operation_id, index as i32) + .await + .map_err(|err| TaskError::Critical(err.to_string()))?; } // Insert this document into storage. If it already existed, this will update it's @@ -260,7 +275,11 @@ mod tests { use p2panda_rs::document::materialization::build_graph; use p2panda_rs::document::traits::AsDocument; - use p2panda_rs::document::{Document, DocumentBuilder, DocumentId, DocumentViewId}; + use p2panda_rs::document::{ + Document, DocumentBuilder, DocumentId, DocumentViewFields, DocumentViewId, + DocumentViewValue, + }; + use p2panda_rs::operation::traits::AsOperation; use p2panda_rs::operation::OperationValue; use p2panda_rs::schema::Schema; use p2panda_rs::storage_provider::traits::{DocumentStore, OperationStore}; @@ -271,6 +290,7 @@ mod tests { use p2panda_rs::test_utils::memory_store::helpers::{ populate_store, send_to_store, PopulateStoreConfig, }; + use p2panda_rs::WithId; use rstest::rstest; use crate::materializer::tasks::reduce_task; @@ -584,4 +604,108 @@ mod tests { assert!(reduce_task(node.context.clone(), input).await.is_ok()); }) } + + #[rstest] + fn updates_operations_sorted_index( + schema: Schema, + #[from(populate_store_config)] + #[with( + 3, + 1, + 1, + false, + constants::schema(), + constants::test_fields(), + constants::test_fields() + )] + config: PopulateStoreConfig, + ) { + test_runner(move |node: TestNode| async move { + // Populate the store with some entries and operations but DON'T materialise any resulting documents. + let (key_pairs, document_ids) = populate_store(&node.context.store, &config).await; + let document_id = document_ids + .get(0) + .expect("There should be at least one document id"); + + let key_pair = key_pairs + .get(0) + .expect("There should be at least one key_pair"); + + // Now we create and insert an UPDATE operation for this document which is pointing at + // the root CREATE operation. + let (_, _) = send_to_store( + &node.context.store, + &operation( + Some(operation_fields(vec![( + "username", + OperationValue::String("hello".to_string()), + )])), + Some(document_id.as_str().parse().unwrap()), + schema.id().to_owned(), + ), + &schema, + key_pair, + ) + .await + .unwrap(); + + // Before running the reduce task retrieve the operations. These should not be in + // their topologically sorted order. + let pre_materialization_operations = node + .context + .store + .get_operations_by_document_id(&document_id) + .await + .unwrap(); + + // Run a reduce task with the document id as input. + let input = TaskInput::DocumentId(document_id.clone()); + assert!(reduce_task(node.context.clone(), input.clone()) + .await + .is_ok()); + + // Retrieve the operations again, they should now be in their topologically sorted order. + let post_materialization_operations = node + .context + .store + .get_operations_by_document_id(&document_id) + .await + .unwrap(); + + // Check we got 4 operations both times. + assert_eq!(pre_materialization_operations.len(), 4); + assert_eq!(post_materialization_operations.len(), 4); + // Check the ordering is different. + assert_ne!( + pre_materialization_operations, + post_materialization_operations + ); + + // The first operation should be a CREATE. + let create_operation = post_materialization_operations.first().unwrap(); + assert!(create_operation.is_create()); + + // Reduce the operations to a document view. + let mut document_view_fields = DocumentViewFields::new(); + for operation in post_materialization_operations { + let fields = operation.fields().unwrap(); + for (key, value) in fields.iter() { + let document_view_value = DocumentViewValue::new(operation.id(), value); + document_view_fields.insert(key, document_view_value); + } + } + + // Retrieve the expected document from the store. + let expected_document = node + .context + .store + .get_document(document_id) + .await + .unwrap() + .unwrap(); + + // The fields should be the same. + assert_eq!(document_view_fields, *expected_document.fields().unwrap()); + }) + } }