Skip to content

Commit

Permalink
Insert tags with crawl queue addition (#265)
Browse files Browse the repository at this point in the history
* Insert tags with crawl queue addition

* update to add post crawl tags

Co-authored-by: Joel Bredeson <joel@spyglass.fyi>
  • Loading branch information
travolin and Joel Bredeson authored Jan 14, 2023
1 parent 0fa4238 commit 886b71c
Showing 1 changed file with 66 additions and 3 deletions.
69 changes: 66 additions & 3 deletions crates/entities/src/models/crawl_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ pub async fn enqueue_all(

for to_add in to_add.chunks(BATCH_SIZE) {
let owned = to_add.iter().map(|r| r.to_owned()).collect::<Vec<_>>();
let urls = to_add
.iter()
.map(|r| r.url.clone().unwrap())
.collect::<Vec<String>>();

let (sql, values) = Entity::insert_many(owned)
.query()
Expand All @@ -531,7 +535,22 @@ pub async fn enqueue_all(
))
.await
{
Ok(_) => {}
Ok(_) => {
println!("tags {:?}", overrides.tags);
if !overrides.tags.is_empty() {
let inserted_rows = Entity::find()
.filter(Column::Url.is_in(urls))
.all(db)
.await
.unwrap_or_default();
if !inserted_rows.is_empty() {
let result = insert_tags_many(&inserted_rows, db, &overrides.tags).await;
if let Err(error) = result {
log::error!("Error inserting tags for crawl {:?}", error);
}
}
}
}
Err(e) => log::error!("insert_many error: {:?}", e),
}
}
Expand All @@ -547,9 +566,10 @@ pub async fn mark_done(
if let Ok(Some(crawl)) = Entity::find_by_id(id).one(db).await {
let mut updated: ActiveModel = crawl.clone().into();
if let Some(tags) = tags {
let _ = updated.insert_tags(db, &tags).await;
if !tags.is_empty() {
let _ = updated.insert_tags(db, &tags).await;
}
}

updated.status = Set(CrawlStatus::Completed);
updated.update(db).await.ok()
} else {
Expand All @@ -573,6 +593,49 @@ pub async fn mark_failed(db: &DatabaseConnection, id: i64, retry: bool) {
}
}

/// Inserts an entry into the tag table for each crawl and
/// tag pair provided
pub async fn insert_tags_many<C: ConnectionTrait>(
docs: &[Model],
db: &C,
tags: &[TagPair],
) -> Result<InsertResult<crawl_tag::ActiveModel>, DbErr> {
let mut tag_models: Vec<tag::Model> = Vec::new();
for (label, value) in tags.iter() {
match get_or_create(db, label.to_owned(), value).await {
Ok(tag) => tag_models.push(tag),
Err(err) => log::error!("{}", err),
}
}

// create connections for each tag
let crawl_tags = docs
.iter()
.flat_map(|model| {
tag_models.iter().map(|t| crawl_tag::ActiveModel {
crawl_queue_id: Set(model.id),
tag_id: Set(t.id),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
..Default::default()
})
})
.collect::<Vec<crawl_tag::ActiveModel>>();

// Insert connections, ignoring duplicates
crawl_tag::Entity::insert_many(crawl_tags)
.on_conflict(
sea_orm::sea_query::OnConflict::columns(vec![
crawl_tag::Column::CrawlQueueId,
crawl_tag::Column::TagId,
])
.do_nothing()
.to_owned(),
)
.exec(db)
.await
}

/// Remove tasks from the crawl queue that match `rule`. Rule is expected
/// to be a SQL like statement.
pub async fn remove_by_rule(db: &DatabaseConnection, rule: &str) -> anyhow::Result<u64> {
Expand Down

0 comments on commit 886b71c

Please sign in to comment.