Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2562 name merging handle name merge sync messages #2674

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions server/repository/src/db_diesel/name_link_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,21 @@ impl<'a> NameLinkRowRepository<'a> {
Ok(())
}

pub async fn find_one_by_id(&self, name_link_id: &str) -> Result<NameLinkRow, RepositoryError> {
pub fn find_one_by_id(
&self,
name_link_id: &str,
) -> Result<Option<NameLinkRow>, RepositoryError> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to match pattern (return optional)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea probably for the best, though not sure when it'd ever not exist without something being quite wrong haha.

let result = name_link
.filter(name_link::id.eq(name_link_id))
.first::<NameLinkRow>(&self.connection.connection)?;
.first::<NameLinkRow>(&self.connection.connection)
.optional()?;
Ok(result)
}

pub fn find_many_by_name_id(&self, name: &str) -> Result<Vec<NameLinkRow>, RepositoryError> {
let result = name_link
.filter(name_id.eq(name))
.load::<NameLinkRow>(&self.connection.connection)?;
Ok(result)
}
}
1 change: 1 addition & 0 deletions server/service/src/sync/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ pub(crate) async fn check_records_against_database(
"DocumentRegistry"
),
ItemLink(_) => todo!(),
NameLink(_) => todo!(),
}
}

Expand Down
1 change: 1 addition & 0 deletions server/service/src/sync/translation_and_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ impl PullUpsertRecord {
DocumentRegistry(record) => DocumentRegistryRowRepository::new(con).upsert_one(record),
Document(record) => sync_upsert_document(con, record),
ItemLink(record) => ItemLinkRowRepository::new(con).upsert_one(record),
NameLink(record) => NameLinkRowRepository::new(con).upsert_one(record),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/service/src/sync/translations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub(crate) fn all_translators() -> SyncTranslators {
// Special translations
Box::new(special::NameToNameStoreJoinTranslation {}),
Box::new(special::ItemMergeTranslation {}),
Box::new(special::NameMergeTranslation {}),
]
}

Expand Down Expand Up @@ -206,6 +207,7 @@ pub(crate) enum PullUpsertRecord {
Document(Document),
DocumentRegistry(DocumentRegistryRow),
ItemLink(ItemLinkRow),
NameLink(NameLinkRow),
}

#[derive(Debug, PartialEq, Clone)]
Expand Down
5 changes: 4 additions & 1 deletion server/service/src/sync/translations/special/item_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ impl SyncTranslation for ItemMergeTranslation {
}
let indirect_link = item_link_repo
.find_one_by_id(&data.merge_id_to_keep)?
.unwrap();
.ok_or(anyhow::anyhow!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unwrap and use clearer error message

"Could not find item link with id {}",
data.merge_id_to_keep
))?;

let upsert_records: Vec<PullUpsertRecord> = item_links
.into_iter()
Expand Down
2 changes: 2 additions & 0 deletions server/service/src/sync/translations/special/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ mod name_to_name_store_join;
pub(crate) use name_to_name_store_join::*;
mod item_merge;
pub(crate) use item_merge::*;
mod name_merge;
pub(crate) use name_merge::*;
155 changes: 155 additions & 0 deletions server/service/src/sync/translations/special/name_merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use repository::{NameLinkRow, NameLinkRowRepository, StorageConnection, SyncBufferRow};

use serde::Deserialize;

use crate::sync::translations::{
IntegrationRecords, LegacyTableName, PullDependency, PullUpsertRecord, SyncTranslation,
};

#[derive(Deserialize)]
pub struct NameMergeMessage {
#[serde(rename = "mergeIdToKeep")]
pub merge_id_to_keep: String,
#[serde(rename = "mergeIdToDelete")]
pub merge_id_to_delete: String,
}

pub(crate) struct NameMergeTranslation {}
impl SyncTranslation for NameMergeTranslation {
fn pull_dependencies(&self) -> PullDependency {
PullDependency {
table: LegacyTableName::NAME,
dependencies: vec![],
}
}

fn try_translate_pull_merge(
&self,
connection: &StorageConnection,
sync_record: &SyncBufferRow,
) -> Result<Option<IntegrationRecords>, anyhow::Error> {
if sync_record.table_name != LegacyTableName::NAME {
return Ok(None);
}

let data = serde_json::from_str::<NameMergeMessage>(&sync_record.data)?;

let name_link_repo = NameLinkRowRepository::new(connection);
let name_links = name_link_repo.find_many_by_name_id(&data.merge_id_to_delete)?;
if name_links.len() == 0 {
return Ok(None);
}
let indirect_link = name_link_repo
.find_one_by_id(&data.merge_id_to_keep)?
.ok_or(anyhow::anyhow!(
"Could not find name link with id {}",
data.merge_id_to_keep
))?;

let upsert_records: Vec<PullUpsertRecord> = name_links
.into_iter()
.map(|NameLinkRow { id, .. }| {
PullUpsertRecord::NameLink(NameLinkRow {
id,
name_id: indirect_link.name_id.clone(),
})
})
.collect();

Ok(Some(IntegrationRecords::from_upserts(upsert_records)))
}
}

#[cfg(test)]
mod tests {
use crate::sync::synchroniser::integrate_and_translate_sync_buffer;

use super::*;
use repository::{
mock::MockDataInserts, test_db::setup_all, NameLinkRowRepository, SyncBufferAction,
SyncBufferRow, SyncBufferRowRepository,
};

#[actix_rt::test]
async fn test_name_merge() {
let mut sync_records = vec![
SyncBufferRow {
record_id: "name_b_merge".to_string(),
table_name: LegacyTableName::NAME.to_string(),
action: SyncBufferAction::Merge,
data: r#"{
"mergeIdToKeep": "name_b",
"mergeIdToDelete": "name_a"
}"#
.to_string(),
..SyncBufferRow::default()
},
SyncBufferRow {
record_id: "name_c_merge".to_string(),
table_name: LegacyTableName::NAME.to_string(),
action: SyncBufferAction::Merge,
data: r#"{
"mergeIdToKeep": "name_c",
"mergeIdToDelete": "name_b"
}"#
.to_string(),
..SyncBufferRow::default()
},
];

let expected_name_links = vec![
NameLinkRow {
id: "name_a".to_string(),
name_id: "name_c".to_string(),
},
NameLinkRow {
id: "name_b".to_string(),
name_id: "name_c".to_string(),
},
NameLinkRow {
id: "name_c".to_string(),
name_id: "name_c".to_string(),
},
];

let (_, connection, _, _) = setup_all(
"test_name_merge_message_translation_in_order",
MockDataInserts::none().units().names(),
)
.await;

SyncBufferRowRepository::new(&connection)
.upsert_many(&sync_records)
.unwrap();
integrate_and_translate_sync_buffer(&connection, true).unwrap();

let name_link_repo = NameLinkRowRepository::new(&connection);
let mut name_links = name_link_repo
.find_many_by_name_id(&"name_c".to_string())
.unwrap();

name_links.sort_by_key(|i| i.id.to_owned());
assert_eq!(name_links, expected_name_links);

let (_, connection, _, _) = setup_all(
"test_name_merge_message_translation_in_reverse_order",
MockDataInserts::none().units().names(),
)
.await;

sync_records.reverse();
SyncBufferRowRepository::new(&connection)
.upsert_many(&sync_records)
.unwrap();

integrate_and_translate_sync_buffer(&connection, true).unwrap();

let name_link_repo = NameLinkRowRepository::new(&connection);
let mut name_links = name_link_repo
.find_many_by_name_id(&"name_c".to_string())
.unwrap();

name_links.sort_by_key(|i| i.id.to_owned());
assert_eq!(name_links, expected_name_links);
}
}