Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - Filesystem processing #283

Merged
merged 10 commits into from
Jan 31, 2023
2 changes: 1 addition & 1 deletion crates/client/src/components/lens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn lens_component(props: &LensProps) -> Html {
}}
{"Uninstall"}
</Btn>
<div class="ml-auto text-neutral-200">{format!("{} docs", buf)}</div>
<div class="ml-auto text-neutral-200">{format!("{buf} docs")}</div>
</div>
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/pages/wizard/menubar_help.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn menubar_help() -> Html {
html! {
<div class="my-auto">
<img src={example_img} alt="Location of the menubar menu" class="h-[128px] mx-auto my-6"/>
<div class="font-bold text-lg">{format!("Spyglass lives in your {}.", menubar_name)}</div>
<div class="font-bold text-lg">{format!("Spyglass lives in your {menubar_name}.")}</div>
<div class="text-sm text-neutral-400 px-8">
{format!("{click_str} on the icon to access your library, discover new lenses, and adjust your settings.")}
</div>
Expand Down
135 changes: 126 additions & 9 deletions crates/entities/src/models/crawl_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashSet;

use regex::RegexSet;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::{OnConflict, SqliteQueryBuilder};
use sea_orm::sea_query::{OnConflict, Query, SqliteQueryBuilder};
use sea_orm::{
sea_query, ConnectionTrait, DatabaseBackend, DbBackend, FromQueryResult, InsertResult,
QueryOrder, QueryTrait, Set, Statement,
Expand Down Expand Up @@ -41,6 +41,8 @@ pub struct TaskError {
#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Eq)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum CrawlStatus {
#[sea_orm(string_value = "Initial")]
Initial,
#[sea_orm(string_value = "Queued")]
Queued,
#[sea_orm(string_value = "Processing")]
Expand Down Expand Up @@ -266,6 +268,14 @@ pub async fn num_tasks_in_progress(db: &DatabaseConnection) -> anyhow::Result<u6
.await
}

/// How many tasks do we have in progress?
pub async fn num_of_files_in_progress(db: &DatabaseConnection) -> anyhow::Result<u64, DbErr> {
Entity::find()
.filter(Column::Status.eq(CrawlStatus::Processing))
.count(db)
.await
}

/// Get the next url in the crawl queue
pub async fn dequeue(
db: &DatabaseConnection,
Expand All @@ -284,8 +294,8 @@ pub async fn dequeue(
// Prioritize any bootstrapping tasks first.
let entity = {
let result = Entity::find()
.filter(Column::Status.eq(CrawlStatus::Queued))
.filter(Column::CrawlType.eq(CrawlType::Bootstrap))
.filter(Column::Status.eq(CrawlStatus::Queued))
.one(db)
.await?;

Expand Down Expand Up @@ -317,6 +327,44 @@ pub async fn dequeue(
Ok(None)
}

/// Get the next url in the crawl queue
pub async fn dequeue_files(
db: &DatabaseConnection,
user_settings: UserSettings,
) -> anyhow::Result<Option<Model>, sea_orm::DbErr> {
// Check for inflight limits
if let Limit::Finite(inflight_crawl_limit) = user_settings.inflight_crawl_limit {
// How many do we have in progress?
let num_in_progress = num_of_files_in_progress(db).await?;
// Nothing to do if we have too many crawls
if num_in_progress >= inflight_crawl_limit as u64 {
return Ok(None);
}
}

let entity = Entity::find()
.filter(Column::Status.eq(CrawlStatus::Queued))
.filter(Column::Url.starts_with("file:"))
.one(db)
.await?;

// Grab new entity and immediately mark in-progress
if let Some(task) = entity {
let mut update: ActiveModel = task.into();
update.status = Set(CrawlStatus::Processing);
return match update.update(db).await {
Ok(model) => Ok(Some(model)),
// Deleted while being processed?
Err(err) => {
log::error!("Unable to update crawl task: {}", err);
Ok(None)
}
};
}

Ok(None)
}

pub async fn dequeue_recrawl(
db: &DatabaseConnection,
user_settings: &UserSettings,
Expand Down Expand Up @@ -451,6 +499,58 @@ fn filter_urls(
.collect::<Vec<String>>()
}

pub async fn enqueue_local_files(
db: &DatabaseConnection,
urls: &[String],
overrides: &EnqueueSettings,
pipeline: Option<String>,
) -> anyhow::Result<(), sea_orm::DbErr> {
let model = urls
.iter()
.map(|url| ActiveModel {
domain: Set(String::from("localhost")),
crawl_type: Set(overrides.crawl_type.clone()),
status: Set(CrawlStatus::Initial),
url: Set(url.to_string()),
pipeline: Set(pipeline.clone()),
..Default::default()
})
.collect::<Vec<ActiveModel>>();

let on_conflict = if overrides.is_recrawl {
OnConflict::column(Column::Url)
.update_column(Column::Status)
.to_owned()
} else {
OnConflict::column(Column::Url).do_nothing().to_owned()
};

let _insert = Entity::insert_many(model)
.on_conflict(on_conflict)
.exec(db)
.await?;
let inserted_rows = Entity::find()
.filter(Column::Url.is_in(urls.to_vec()))
.all(db)
.await?;

let ids = inserted_rows.iter().map(|row| row.id).collect::<Vec<i64>>();
let tag_rslt = insert_tags_many(&inserted_rows, db, &overrides.tags).await;
if tag_rslt.is_ok() {
let query = Query::update()
.table(Entity.table_ref())
.values([(Column::Status, CrawlStatus::Queued.into())])
.and_where(Column::Id.is_in(ids))
.to_owned();

let query = query.to_string(SqliteQueryBuilder);
db.execute(Statement::from_string(db.get_database_backend(), query))
.await?;
}

Ok(())
}

pub async fn enqueue_all(
db: &DatabaseConnection,
urls: &[String],
Expand Down Expand Up @@ -536,7 +636,6 @@ pub async fn enqueue_all(
.await
{
Ok(_) => {
println!("tags {:?}", overrides.tags);
if !overrides.tags.is_empty() {
let inserted_rows = Entity::find()
.filter(Column::Url.is_in(urls))
Expand Down Expand Up @@ -647,7 +746,7 @@ pub async fn remove_by_rule(db: &DatabaseConnection, rule: &str) -> anyhow::Resu
.map(|x| x.id)
.collect();

let rows_affected = delete_many_by_id(db.clone(), &dbids).await?;
let rows_affected = delete_many_by_id(db, &dbids).await?;
Ok(rows_affected)
}

Expand Down Expand Up @@ -694,30 +793,48 @@ pub async fn update_or_remove_task(
pub async fn delete_by_lens(db: DatabaseConnection, name: &str) -> Result<(), sea_orm::DbErr> {
if let Ok(ids) = find_by_lens(db.clone(), name).await {
let dbids: Vec<i64> = ids.iter().map(|item| item.id).collect();
delete_many_by_id(db, &dbids).await?;
delete_many_by_id(&db, &dbids).await?;
}
Ok(())
}

/// Helper method used to delete multiple crawl entries by id. This method will first
/// delete all related tag references before deleting the crawl entries
pub async fn delete_many_by_id(
db: DatabaseConnection,
db: &DatabaseConnection,
dbids: &[i64],
) -> Result<u64, sea_orm::DbErr> {
// Delete all associated tags
crawl_tag::Entity::delete_many()
.filter(crawl_tag::Column::CrawlQueueId.is_in(dbids.to_owned()))
.exec(&db)
.exec(db)
.await?;

// Delete item
let res = Entity::delete_many()
.filter(Column::Id.is_in(dbids.to_owned()))
.exec(&db)
.exec(db)
.await?;

Ok(res.rows_affected)
}

/// Helper method used to delete multiple crawl entries by url. This method will first
/// delete all related tag references before deleting the crawl entries
pub async fn delete_many_by_url(
db: &DatabaseConnection,
urls: &Vec<String>,
) -> Result<u64, sea_orm::DbErr> {
let entries = Entity::find()
.filter(Column::Url.is_in(urls.to_owned()))
.all(db)
.await?;

let id_list = entries.iter().map(|entry| entry.id).collect::<Vec<i64>>();

delete_many_by_id(db, &id_list).await
}

#[derive(Debug, FromQueryResult)]
pub struct CrawlTaskId {
pub id: i64,
Expand Down Expand Up @@ -787,7 +904,7 @@ mod test {
let sql = gen_dequeue_sql(settings);
assert_eq!(
sql.to_string(),
"WITH\nindexed AS (\n SELECT\n domain,\n count(*) as count\n FROM indexed_document\n GROUP BY domain\n),\ninflight AS (\n SELECT\n domain,\n count(*) as count\n FROM crawl_queue\n WHERE status = \"Processing\"\n GROUP BY domain\n)\nSELECT\n cq.*\nFROM crawl_queue cq\nLEFT JOIN indexed ON indexed.domain = cq.domain\nLEFT JOIN inflight ON inflight.domain = cq.domain\nWHERE\n COALESCE(indexed.count, 0) < 500000 AND\n COALESCE(inflight.count, 0) < 2 AND\n status = \"Queued\"\nORDER BY\n cq.updated_at ASC"
"WITH\nindexed AS (\n SELECT\n domain,\n count(*) as count\n FROM indexed_document\n GROUP BY domain\n),\ninflight AS (\n SELECT\n domain,\n count(*) as count\n FROM crawl_queue\n WHERE status = \"Processing\"\n GROUP BY domain\n)\nSELECT\n cq.*\nFROM crawl_queue cq\nLEFT JOIN indexed ON indexed.domain = cq.domain\nLEFT JOIN inflight ON inflight.domain = cq.domain\nWHERE\n COALESCE(indexed.count, 0) < 500000 AND\n COALESCE(inflight.count, 0) < 2 AND\n status = \"Queued\" and\n url not like \"file%\"\nORDER BY\n cq.updated_at ASC"
);
}

Expand Down
67 changes: 67 additions & 0 deletions crates/entities/src/models/indexed_document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,36 @@ pub async fn insert_tags_many<C: ConnectionTrait>(
.await
}

pub async fn insert_tags_for_document<C: ConnectionTrait>(
doc: &Model,
db: &C,
tags: &[u64],
) -> Result<InsertResult<document_tag::ActiveModel>, DbErr> {
let doc_tags = tags
.iter()
.map(|tag_id| document_tag::ActiveModel {
indexed_document_id: Set(doc.id),
tag_id: Set(*tag_id as i64),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
..Default::default()
})
.collect::<Vec<document_tag::ActiveModel>>();

// Insert connections, ignoring duplicates
document_tag::Entity::insert_many(doc_tags)
.on_conflict(
sea_orm::sea_query::OnConflict::columns(vec![
document_tag::Column::IndexedDocumentId,
document_tag::Column::TagId,
])
.do_nothing()
.to_owned(),
)
.exec(db)
.await
}

/// Remove documents from the indexed_document table that match `rule`. Rule is expected
/// to be a SQL like statement.
pub async fn remove_by_rule(db: &DatabaseConnection, rule: &str) -> anyhow::Result<Vec<String>> {
Expand Down Expand Up @@ -207,6 +237,43 @@ pub async fn remove_by_rule(db: &DatabaseConnection, rule: &str) -> anyhow::Resu
.collect::<Vec<String>>())
}

/// Helper method used to delete multiple documents by id. This method will first
/// delete all related tag references before deleting the documents
pub async fn delete_many_by_id(
db: &DatabaseConnection,
dbids: &[i64],
) -> Result<u64, sea_orm::DbErr> {
// Delete all associated tags
document_tag::Entity::delete_many()
.filter(document_tag::Column::IndexedDocumentId.is_in(dbids.to_owned()))
.exec(db)
.await?;

// Delete item
let res = Entity::delete_many()
.filter(Column::Id.is_in(dbids.to_owned()))
.exec(db)
.await?;

Ok(res.rows_affected)
}

/// Helper method used to delete multiple documents by url. This method will first
/// delete all related tag references before deleting the documents
pub async fn delete_many_by_url(
db: &DatabaseConnection,
urls: Vec<String>,
) -> Result<u64, sea_orm::DbErr> {
let entries = Entity::find()
.filter(Column::Url.is_in(urls))
.all(db)
.await?;

let id_list = entries.iter().map(|entry| entry.id).collect::<Vec<i64>>();

delete_many_by_id(db, &id_list).await
}

#[derive(Debug, FromQueryResult)]
pub struct IndexedDocumentId {
pub id: i64,
Expand Down
1 change: 1 addition & 0 deletions crates/entities/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod fetch_history;
pub mod indexed_document;
pub mod lens;
pub mod link;
pub mod processed_files;
pub mod resource_rule;
pub mod tag;

Expand Down
Loading