From cc8614292a35dcfabc10e41603c8104c029b71a1 Mon Sep 17 00:00:00 2001 From: Joel Bredeson Date: Fri, 27 Jan 2023 12:38:21 -0800 Subject: [PATCH 1/9] Initial file processing code updates --- crates/entities/src/models/crawl_queue.rs | 138 +++- .../entities/src/models/indexed_document.rs | 37 + crates/entities/src/models/mod.rs | 1 + crates/entities/src/models/processed_files.rs | 75 ++ crates/entities/src/models/sql/dequeue.sqlx | 3 +- crates/entities/src/models/tag.rs | 7 + crates/migrations/src/lib.rs | 2 + .../src/m20230126_000001_create_file_table.rs | 38 + crates/spyglass/Cargo.toml | 7 +- crates/spyglass/src/crawler/mod.rs | 187 +++-- crates/spyglass/src/documents/mod.rs | 46 ++ crates/spyglass/src/filesystem/mod.rs | 653 ++++++++++++++++++ crates/spyglass/src/filesystem/utils.rs | 239 +++++++ crates/spyglass/src/lib.rs | 2 + crates/spyglass/src/main.rs | 6 + crates/spyglass/src/state.rs | 7 + crates/spyglass/src/task.rs | 2 +- crates/spyglass/src/task/manager.rs | 45 +- 18 files changed, 1436 insertions(+), 59 deletions(-) create mode 100644 crates/entities/src/models/processed_files.rs create mode 100644 crates/migrations/src/m20230126_000001_create_file_table.rs create mode 100644 crates/spyglass/src/documents/mod.rs create mode 100644 crates/spyglass/src/filesystem/mod.rs create mode 100644 crates/spyglass/src/filesystem/utils.rs diff --git a/crates/entities/src/models/crawl_queue.rs b/crates/entities/src/models/crawl_queue.rs index b30873932..3a6c824e5 100644 --- a/crates/entities/src/models/crawl_queue.rs +++ b/crates/entities/src/models/crawl_queue.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use regex::RegexSet; use sea_orm::entity::prelude::*; -use sea_orm::sea_query::{OnConflict, SqliteQueryBuilder}; +use sea_orm::sea_query::{OnConflict, Query, SqliteQueryBuilder}; use sea_orm::{ sea_query, ConnectionTrait, DatabaseBackend, DbBackend, FromQueryResult, InsertResult, QueryOrder, QueryTrait, Set, Statement, @@ -41,6 +41,8 @@ pub struct TaskError { #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Eq)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum CrawlStatus { + #[sea_orm(string_value = "Initial")] + Initial, #[sea_orm(string_value = "Queued")] Queued, #[sea_orm(string_value = "Processing")] @@ -266,6 +268,14 @@ pub async fn num_tasks_in_progress(db: &DatabaseConnection) -> anyhow::Result anyhow::Result { + Entity::find() + .filter(Column::Status.eq(CrawlStatus::Processing)) + .count(db) + .await +} + /// Get the next url in the crawl queue pub async fn dequeue( db: &DatabaseConnection, @@ -284,8 +294,8 @@ pub async fn dequeue( // Prioritize any bootstrapping tasks first. let entity = { let result = Entity::find() - .filter(Column::Status.eq(CrawlStatus::Queued)) .filter(Column::CrawlType.eq(CrawlType::Bootstrap)) + .filter(Column::Status.eq(CrawlStatus::Queued)) .one(db) .await?; @@ -293,10 +303,12 @@ pub async fn dequeue( Some(task) } else { // Otherwise, grab a URL off the stack & send it back. - Entity::find() + let val = Entity::find() .from_raw_sql(gen_dequeue_sql(user_settings)) .one(db) - .await? + .await?; + log::error!("Got one {:?}", val); + val } }; @@ -317,6 +329,44 @@ pub async fn dequeue( Ok(None) } +/// Get the next url in the crawl queue +pub async fn dequeue_files( + db: &DatabaseConnection, + user_settings: UserSettings, +) -> anyhow::Result, sea_orm::DbErr> { + // Check for inflight limits + if let Limit::Finite(inflight_crawl_limit) = user_settings.inflight_crawl_limit { + // How many do we have in progress? + let num_in_progress = num_of_files_in_progress(db).await?; + // Nothing to do if we have too many crawls + if num_in_progress >= inflight_crawl_limit as u64 { + return Ok(None); + } + } + + let entity = Entity::find() + .filter(Column::Status.eq(CrawlStatus::Queued)) + .filter(Column::Url.starts_with("file:")) + .one(db) + .await?; + + // Grab new entity and immediately mark in-progress + if let Some(task) = entity { + let mut update: ActiveModel = task.into(); + update.status = Set(CrawlStatus::Processing); + return match update.update(db).await { + Ok(model) => Ok(Some(model)), + // Deleted while being processed? + Err(err) => { + log::error!("Unable to update crawl task: {}", err); + Ok(None) + } + }; + } + + Ok(None) +} + pub async fn dequeue_recrawl( db: &DatabaseConnection, user_settings: &UserSettings, @@ -451,6 +501,58 @@ fn filter_urls( .collect::>() } +pub async fn enqueue_local_files( + db: &DatabaseConnection, + urls: &[String], + overrides: &EnqueueSettings, + pipeline: Option, +) -> anyhow::Result<(), sea_orm::DbErr> { + let model = urls + .iter() + .map(|url| ActiveModel { + domain: Set(String::from("localhost")), + crawl_type: Set(overrides.crawl_type.clone()), + status: Set(CrawlStatus::Initial), + url: Set(url.to_string()), + pipeline: Set(pipeline.clone()), + ..Default::default() + }) + .collect::>(); + + let on_conflict = if overrides.is_recrawl { + OnConflict::column(Column::Url) + .update_column(Column::Status) + .to_owned() + } else { + OnConflict::column(Column::Url).do_nothing().to_owned() + }; + + let insert = Entity::insert_many(model) + .on_conflict(on_conflict) + .exec(db) + .await?; + let inserted_rows = Entity::find() + .filter(Column::Url.is_in(urls.to_vec())) + .all(db) + .await?; + + let ids = inserted_rows.iter().map(|row| row.id).collect::>(); + let tag_rslt = insert_tags_many(&inserted_rows, db, &overrides.tags).await; + if let Ok(_) = tag_rslt { + let query = Query::update() + .table(Entity.table_ref()) + .values([(Column::Status, CrawlStatus::Queued.into())]) + .and_where(Column::Id.is_in(ids)) + .to_owned(); + + let query = query.to_string(SqliteQueryBuilder); + db.execute(Statement::from_string(db.get_database_backend(), query)) + .await?; + } + + Ok(()) +} + pub async fn enqueue_all( db: &DatabaseConnection, urls: &[String], @@ -647,7 +749,7 @@ pub async fn remove_by_rule(db: &DatabaseConnection, rule: &str) -> anyhow::Resu .map(|x| x.id) .collect(); - let rows_affected = delete_many_by_id(db.clone(), &dbids).await?; + let rows_affected = delete_many_by_id(&db, &dbids).await?; Ok(rows_affected) } @@ -694,30 +796,48 @@ pub async fn update_or_remove_task( pub async fn delete_by_lens(db: DatabaseConnection, name: &str) -> Result<(), sea_orm::DbErr> { if let Ok(ids) = find_by_lens(db.clone(), name).await { let dbids: Vec = ids.iter().map(|item| item.id).collect(); - delete_many_by_id(db, &dbids).await?; + delete_many_by_id(&db, &dbids).await?; } Ok(()) } +/// Helper method used to delete multiple crawl entries by id. This method will first +/// delete all related tag references before deleting the crawl entries pub async fn delete_many_by_id( - db: DatabaseConnection, + db: &DatabaseConnection, dbids: &[i64], ) -> Result { // Delete all associated tags crawl_tag::Entity::delete_many() .filter(crawl_tag::Column::CrawlQueueId.is_in(dbids.to_owned())) - .exec(&db) + .exec(db) .await?; // Delete item let res = Entity::delete_many() .filter(Column::Id.is_in(dbids.to_owned())) - .exec(&db) + .exec(db) .await?; Ok(res.rows_affected) } +/// Helper method used to delete multiple crawl entries by url. This method will first +/// delete all related tag references before deleting the crawl entries +pub async fn delete_many_by_url( + db: &DatabaseConnection, + urls: &Vec, +) -> Result { + let entries = Entity::find() + .filter(Column::Url.is_in(urls.to_owned())) + .all(db) + .await?; + + let id_list = entries.iter().map(|entry| entry.id).collect::>(); + + delete_many_by_id(db, &id_list).await +} + #[derive(Debug, FromQueryResult)] pub struct CrawlTaskId { pub id: i64, diff --git a/crates/entities/src/models/indexed_document.rs b/crates/entities/src/models/indexed_document.rs index 75f14e0af..0e2b9c07c 100644 --- a/crates/entities/src/models/indexed_document.rs +++ b/crates/entities/src/models/indexed_document.rs @@ -207,6 +207,43 @@ pub async fn remove_by_rule(db: &DatabaseConnection, rule: &str) -> anyhow::Resu .collect::>()) } +/// Helper method used to delete multiple documents by id. This method will first +/// delete all related tag references before deleting the documents +pub async fn delete_many_by_id( + db: &DatabaseConnection, + dbids: &[i64], +) -> Result { + // Delete all associated tags + document_tag::Entity::delete_many() + .filter(document_tag::Column::IndexedDocumentId.is_in(dbids.to_owned())) + .exec(db) + .await?; + + // Delete item + let res = Entity::delete_many() + .filter(Column::Id.is_in(dbids.to_owned())) + .exec(db) + .await?; + + Ok(res.rows_affected) +} + +/// Helper method used to delete multiple documents by url. This method will first +/// delete all related tag references before deleting the documents +pub async fn delete_many_by_url( + db: &DatabaseConnection, + urls: Vec, +) -> Result { + let entries = Entity::find() + .filter(Column::Url.is_in(urls)) + .all(db) + .await?; + + let id_list = entries.iter().map(|entry| entry.id).collect::>(); + + delete_many_by_id(db, &id_list).await +} + #[derive(Debug, FromQueryResult)] pub struct IndexedDocumentId { pub id: i64, diff --git a/crates/entities/src/models/mod.rs b/crates/entities/src/models/mod.rs index b6f369cf9..f6c51b195 100644 --- a/crates/entities/src/models/mod.rs +++ b/crates/entities/src/models/mod.rs @@ -9,6 +9,7 @@ pub mod fetch_history; pub mod indexed_document; pub mod lens; pub mod link; +pub mod processed_files; pub mod resource_rule; pub mod tag; diff --git a/crates/entities/src/models/processed_files.rs b/crates/entities/src/models/processed_files.rs new file mode 100644 index 000000000..f772e7d1d --- /dev/null +++ b/crates/entities/src/models/processed_files.rs @@ -0,0 +1,75 @@ +use sea_orm::entity::prelude::*; +use sea_orm::Set; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Eq)] +#[sea_orm(table_name = "processed_files")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i64, + /// URL to crawl + #[sea_orm(unique)] + pub file_path: String, + + /// When this was first added to the crawl queue. + pub created_at: DateTimeUtc, + /// When this task was last updated. + pub last_modified: DateTimeUtc, +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel { + fn new() -> Self { + Self { + created_at: Set(chrono::Utc::now()), + ..ActiveModelTrait::default() + } + } + + // Triggered before insert / update + fn before_save(mut self, insert: bool) -> Result { + Ok(self) + } +} + +/// Helper method used to remove all documents that are not in the provided paths. This +/// is used to remove documents for folders that are no longer configured +pub async fn remove_unmatched_paths( + db: &DatabaseConnection, + paths: Vec, +) -> anyhow::Result> { + let mut find = Entity::find(); + if !paths.is_empty() { + for path in paths { + find = find.filter(Column::FilePath.not_like(format!("{}%", path).as_str())); + } + } else { + log::debug!("No paths being watched removing all."); + } + + match find.all(db).await { + Ok(items) => { + log::debug!("Removing {:?} unused files from the database.", items.len()); + let ids = items.iter().map(|model| model.id).collect::>(); + if let Err(error) = Entity::delete_many() + .filter(Column::Id.is_in(ids)) + .exec(db) + .await + { + log::error!("Error deleting unused paths {:?}", error); + return Err(anyhow::Error::from(error)); + } + Ok(items) + } + Err(error) => return Err(anyhow::Error::from(error)), + } +} diff --git a/crates/entities/src/models/sql/dequeue.sqlx b/crates/entities/src/models/sql/dequeue.sqlx index 90f24cb31..f6c20b6ca 100644 --- a/crates/entities/src/models/sql/dequeue.sqlx +++ b/crates/entities/src/models/sql/dequeue.sqlx @@ -22,6 +22,7 @@ LEFT JOIN inflight ON inflight.domain = cq.domain WHERE COALESCE(indexed.count, 0) < ? AND COALESCE(inflight.count, 0) < ? AND - status = "Queued" + status = "Queued" and + url not like "file%" ORDER BY cq.updated_at ASC \ No newline at end of file diff --git a/crates/entities/src/models/tag.rs b/crates/entities/src/models/tag.rs index 9ebd7c581..80faa12cc 100644 --- a/crates/entities/src/models/tag.rs +++ b/crates/entities/src/models/tag.rs @@ -29,6 +29,10 @@ pub enum TagType { // mimetypes somewhere #[sea_orm(string_value = "mimetype")] MimeType, + // General type tag, Used for high level types ex: File, directory. The MimeType + // would be used as a more specific type + #[sea_orm(string_value = "type")] + Type, // where this document came from, #[sea_orm(string_value = "source")] Source, @@ -41,6 +45,9 @@ pub enum TagType { // Part of this/these lens(es) #[sea_orm(string_value = "lens")] Lens, + // For file based content this tag + #[sea_orm(string_value = "fileext")] + FileExt, } #[derive(AsRefStr)] diff --git a/crates/migrations/src/lib.rs b/crates/migrations/src/lib.rs index ff841c25b..a78a43e5c 100644 --- a/crates/migrations/src/lib.rs +++ b/crates/migrations/src/lib.rs @@ -24,6 +24,7 @@ mod m20221210_000001_add_crawl_tags_table; mod m20230104_000001_add_column_n_index; mod m20230111_000001_add_lens_column; mod m20230112_000001_migrate_search_schema; +mod m20230126_000001_create_file_table; mod utils; pub struct Migrator; @@ -53,6 +54,7 @@ impl MigratorTrait for Migrator { Box::new(m20230104_000001_add_column_n_index::Migration), Box::new(m20230111_000001_add_lens_column::Migration), Box::new(m20230112_000001_migrate_search_schema::Migration), + Box::new(m20230126_000001_create_file_table::Migration), ] } } diff --git a/crates/migrations/src/m20230126_000001_create_file_table.rs b/crates/migrations/src/m20230126_000001_create_file_table.rs new file mode 100644 index 000000000..80278a878 --- /dev/null +++ b/crates/migrations/src/m20230126_000001_create_file_table.rs @@ -0,0 +1,38 @@ +use entities::sea_orm::{ConnectionTrait, Statement}; +use sea_orm_migration::prelude::*; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "m20230126_000001_create_file_table" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let processed_files = r#" + CREATE TABLE IF NOT EXISTS "processed_files" ( + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, + "file_path" text NOT NULL UNIQUE, + "created_at" text NOT NULL, + "last_modified" text NOT NULL);"#; + + for sql in &[processed_files] { + manager + .get_connection() + .execute(Statement::from_string( + manager.get_database_backend(), + sql.to_owned().to_string(), + )) + .await?; + } + + Ok(()) + } + + async fn down(&self, _: &SchemaManager) -> Result<(), DbErr> { + Ok(()) + } +} diff --git a/crates/spyglass/Cargo.toml b/crates/spyglass/Cargo.toml index eeaf8eee0..3e62a5de8 100644 --- a/crates/spyglass/Cargo.toml +++ b/crates/spyglass/Cargo.toml @@ -19,6 +19,7 @@ ego-tree = "0.6.2" entities = { path = "../entities" } flate2 = "1.0.24" futures = "0.3" +glob = "0.3.1" google = { git = "https://github.com/spyglass-search/third-party-apis", rev = "37675fbc7973b2e8ad7b8f1544f9f0f05f0ed1e4" } hex = "0.4" hostname = "^0.3" @@ -26,9 +27,13 @@ html5ever = "0.25" http = "0.2" ignore = "0.4" jsonrpsee = { version = "0.16.2", features = ["server"] } +lnk = "0.5.1" log = "0.4" migration = { path = "../migrations" } -notify = "5.0.0-pre.16" +mime = "0.3.16" +new_mime_guess = "4.0.1" +notify = { version = "5.0.0", default-features = false , features = ["serde", "macos_kqueue"]} +notify-debouncer-mini = { version = "*", default-features = false } open = "3.0" percent-encoding = "2.2" regex = "1" diff --git a/crates/spyglass/src/crawler/mod.rs b/crates/spyglass/src/crawler/mod.rs index 3cd4284ac..32024b64e 100644 --- a/crates/spyglass/src/crawler/mod.rs +++ b/crates/spyglass/src/crawler/mod.rs @@ -5,7 +5,9 @@ use addr::parse_domain_name; use anyhow::Result; use chrono::prelude::*; use chrono::Duration; +use entities::models::crawl_queue::EnqueueSettings; use entities::models::tag::TagPair; +use entities::models::tag::TagType; use percent_encoding::percent_decode_str; use sha2::{Digest, Sha256}; use thiserror::Error; @@ -16,6 +18,7 @@ use entities::sea_orm::prelude::*; use crate::connection::load_connection; use crate::crawler::bootstrap::create_archive_url; +use crate::filesystem; use crate::parser; use crate::scraper::{html_to_text, DEFAULT_DESC_LENGTH}; use crate::state::AppState; @@ -330,7 +333,7 @@ impl Crawler { // handle any fetching/parsing. match url.scheme() { "api" => self.handle_api_fetch(state, &crawl, &url).await, - "file" => self.handle_file_fetch(&crawl, &url).await, + "file" => self.handle_file_fetch(state, &crawl, &url).await, "http" | "https" => { self.handle_http_fetch(&state.db, &crawl, &url, parse_results) .await @@ -360,6 +363,7 @@ impl Crawler { async fn handle_file_fetch( &self, + state: &AppState, _: &crawl_queue::Model, url: &Url, ) -> Result { @@ -371,7 +375,7 @@ impl Crawler { let path = Path::new(&file_path); // Is this a file and does this exist? - if !path.exists() || !path.is_file() { + if !path.exists() { return Err(CrawlError::NotFound); } @@ -381,48 +385,7 @@ impl Crawler { .map(|x| x.to_string()) .expect("Unable to convert path file name to string"); - // Attempt to read file - let contents = match path.extension() { - Some(ext) if parser::supports_filetype(ext) => match parser::parse_file(ext, path) { - Err(err) => return Err(CrawlError::ParseError(err.to_string())), - Ok(contents) => contents, - }, - _ => match std::fs::read_to_string(path) { - Ok(x) => x, - Err(err) => { - return Err(CrawlError::FetchError(err.to_string())); - } - }, - }; - - let mut hasher = Sha256::new(); - hasher.update(contents.as_bytes()); - let content_hash = Some(hex::encode(&hasher.finalize()[..])); - - // TODO: Better description building for text files? - let description = if !contents.is_empty() { - let desc = contents - .split(' ') - .into_iter() - .take(DEFAULT_DESC_LENGTH) - .collect::>() - .join(" "); - Some(desc) - } else { - None - }; - - Ok(CrawlResult { - content_hash, - content: Some(contents.clone()), - // Does a file have a description? Pull the first part of the file - description, - title: Some(file_name), - url: url.to_string(), - open_url: Some(url.to_string()), - links: Default::default(), - ..Default::default() - }) + _process_path(state, path, file_name, url).await } /// Handle HTTP related requests @@ -504,6 +467,142 @@ impl Crawler { } } +fn _build_file_tags(path: &Path) -> Vec { + let mut tags = Vec::new(); + tags.push((TagType::Lens, String::from("filesystem"))); + if path.is_dir() { + tags.push((TagType::Type, String::from("directory"))); + } else if path.is_file() { + tags.push((TagType::Type, String::from("file"))); + let ext = path + .extension() + .and_then(|x| x.to_str()) + .map(|x| x.to_string()); + if let Some(ext) = ext { + tags.push((TagType::FileExt, ext)); + } + } + + if path.is_symlink() { + tags.push((TagType::Type, String::from("symlink"))) + } + + let guess = new_mime_guess::from_path(path.clone()); + for mime_guess in guess.iter() { + tags.push((TagType::MimeType, mime_guess.to_string())); + } + + tags +} + +fn _process_file(path: &Path, file_name: String, url: &Url) -> Result { + // Attempt to read file + let contents = match path.extension() { + Some(ext) if parser::supports_filetype(ext) => match parser::parse_file(ext, path) { + Err(err) => return Err(CrawlError::ParseError(err.to_string())), + Ok(contents) => contents, + }, + _ => match std::fs::read_to_string(path) { + Ok(x) => x, + Err(err) => { + return Err(CrawlError::FetchError(err.to_string())); + } + }, + }; + + let mut hasher = Sha256::new(); + hasher.update(contents.as_bytes()); + let content_hash = Some(hex::encode(&hasher.finalize()[..])); + + // TODO: Better description building for text files? + let description = if !contents.is_empty() { + let desc = contents + .split(' ') + .into_iter() + .take(DEFAULT_DESC_LENGTH) + .collect::>() + .join(" "); + Some(desc) + } else { + None + }; + + let tags = _build_file_tags(path); + Ok(CrawlResult { + content_hash, + content: Some(contents.clone()), + // Does a file have a description? Pull the first part of the file + description, + title: Some(file_name), + url: url.to_string(), + open_url: Some(url.to_string()), + links: Default::default(), + tags: tags, + ..Default::default() + }) +} + +async fn _process_path( + state: &AppState, + path: &Path, + file_name: String, + url: &Url, +) -> Result { + let extension = filesystem::utils::get_supported_file_extensions(state); + + if path.is_file() { + if _matches_ext(path, &extension) { + log::error!("Is a file {:?}", path); + _process_file(path, file_name, url) + } else { + let tags = _build_file_tags(path); + let mut hasher = Sha256::new(); + hasher.update(file_name.as_bytes()); + let content_hash = Some(hex::encode(&hasher.finalize()[..])); + Ok(CrawlResult { + content_hash, + content: Some(file_name.clone()), + // Does a file have a description? Pull the first part of the file + description: Some(file_name.clone()), + title: Some(url.to_string()), + url: url.to_string(), + open_url: Some(url.to_string()), + links: Default::default(), + tags: tags, + ..Default::default() + }) + } + } else if path.is_dir() { + let tags = _build_file_tags(path); + let mut hasher = Sha256::new(); + hasher.update(file_name.as_bytes()); + let content_hash = Some(hex::encode(&hasher.finalize()[..])); + Ok(CrawlResult { + content_hash, + content: Some(file_name.clone()), + // Does a file have a description? Pull the first part of the file + description: Some(file_name.clone()), + title: Some(url.to_string()), + url: url.to_string(), + open_url: Some(url.to_string()), + links: Default::default(), + tags: tags, + ..Default::default() + }) + } else { + return Err(CrawlError::NotFound); + } +} + +fn _matches_ext(path: &Path, extension: &HashSet) -> bool { + let ext = &path + .extension() + .and_then(|x| x.to_str()) + .map(|x| x.to_string()) + .unwrap_or_default(); + extension.contains(ext) +} + #[cfg(test)] mod test { use entities::models::crawl_queue::CrawlType; diff --git a/crates/spyglass/src/documents/mod.rs b/crates/spyglass/src/documents/mod.rs new file mode 100644 index 000000000..ef7c10e7b --- /dev/null +++ b/crates/spyglass/src/documents/mod.rs @@ -0,0 +1,46 @@ +use std::collections::HashMap; + +use entities::models::{crawl_queue, indexed_document}; +use entities::sea_orm::entity::prelude::*; + +use crate::{search::Searcher, state::AppState}; + +/// Helper method to delete indexed documents, crawl queue items and search +/// documents by url +pub async fn delete_documents_by_uri(state: &AppState, uri: Vec) { + log::debug!("Deleting {:?} documents", uri.len()); + + // Delete from crawl queue + + if let Err(error) = crawl_queue::delete_many_by_url(&state.db, &uri).await { + log::error!("Error delete items from crawl queue {:?}", error); + } + + // find all documents that already exist with that url + let existing: Vec = indexed_document::Entity::find() + .filter(indexed_document::Column::Url.is_in(uri.clone())) + .all(&state.db) + .await + .unwrap_or_default(); + + // build a hash map of Url to the doc id + let mut id_map = HashMap::new(); + for model in &existing { + let _ = id_map.insert(model.url.to_string(), model.doc_id.clone()); + } + + // build a list of doc ids to delete from the index + let doc_id_list = id_map + .values() + .into_iter() + .map(|x| x.to_owned()) + .collect::>(); + + let _ = Searcher::delete_many_by_id(state, &doc_id_list, false).await; + let _ = Searcher::save(state).await; + + // now that the documents are deleted delete from the queue + if let Err(error) = indexed_document::delete_many_by_url(&state.db, uri).await { + log::error!("Error deleting for indexed document store {:?}", error); + } +} diff --git a/crates/spyglass/src/filesystem/mod.rs b/crates/spyglass/src/filesystem/mod.rs new file mode 100644 index 000000000..9dbc942c9 --- /dev/null +++ b/crates/spyglass/src/filesystem/mod.rs @@ -0,0 +1,653 @@ +use dashmap::DashMap; +use entities::models::crawl_queue::{self, CrawlType, EnqueueSettings}; +use entities::models::processed_files; +use entities::models::tag::TagType; +use entities::sea_orm::entity::prelude::*; +use entities::sea_orm::DatabaseConnection; +use ignore::gitignore::Gitignore; +use ignore::WalkBuilder; + +use entities::sea_orm::Set; +use migration::OnConflict; + +use std::sync::Arc; +use std::time::Duration; +use std::{ + collections::HashSet, + path::{Path, PathBuf}, +}; + +use notify::ReadDirectoryChangesWatcher; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::Mutex; +use uuid::Uuid; + +use notify_debouncer_mini::{DebouncedEvent, DebouncedEventKind, Debouncer}; + +use crate::documents; +use crate::state::AppState; + +pub mod utils; + +/// Watcher responsible for processing paths on the file system. +/// All filesystem updates will be run through the debouncer to +/// batch updates then processed through any found git ignore files. +/// Any updates that make it through will be passed to listeners +pub struct SpyglassFileWatcher { + // The director watcher services + watcher: Arc>>, + // The map of path being watched to the list of watchers + path_map: DashMap>, + // Map of .gitignore file path to the ignore file processor + ignore_files: DashMap, + // The database connection used to update the database with + // the state of file processing + db: DatabaseConnection, +} + +/// The watch path represents a watcher of a path. The watcher will +/// be notified of system changes via the send and receiver +#[derive(Debug)] +pub struct WatchPath { + _path: PathBuf, + _uuid: String, + extensions: Option>, + tx_channel: Option>>, +} + +impl WatchPath { + /// Constructs a new watch path with the path that is being watched + /// and the set of extensions to notify the listener with + pub fn new(path: &Path, extensions: Option>) -> Self { + let uuid = Uuid::new_v4().as_hyphenated().to_string(); + + WatchPath { + _path: path.to_path_buf(), + _uuid: uuid.clone(), + extensions, + tx_channel: None::>>, + } + } + + /// Builds the receiver used to receive file update messages + pub fn build_rx(&mut self) -> Receiver> { + let (tx, rx) = tokio::sync::mpsc::channel(1); + self.tx_channel = Some(tx); + rx + } + + /// Sends a change notification ot the receiver + pub async fn send_notify(&self, events: Vec) -> anyhow::Result<()> { + if let Some(tx) = &self.tx_channel { + match &self.extensions { + Some(ext_list) => { + // if there are extension filters only grab files that match + // the extension + let valid_events = events + .iter() + .filter_map(|evt| { + if let Some(ext) = evt.path.extension() { + if let Ok(ext_string) = ext.to_owned().into_string() { + if ext_list.contains(&ext_string) { + return Some(evt.clone()); + } + } + } + None + }) + .collect::>(); + + // Send all valid updates to the listener + if !valid_events.is_empty() { + if let Err(error) = tx.send(valid_events).await { + log::error!("Error sending event {:?}", error); + return Err(anyhow::Error::from(error)); + } + } + } + None => { + // With no extension filter send all updates to the + // listener + if let Err(error) = tx.send(events).await { + log::error!("Error sending event {:?}", error); + return Err(anyhow::Error::from(error)); + } + } + } + } + Ok(()) + } +} + +/// General helper method used to watch for file change events and shutdown events. +/// This is the top most level watcher that receives all file update events and +/// then send them for appropriate processing +async fn watch_events( + state: AppState, + mut file_events: Receiver, Vec>>, +) { + let mut shutdown_rx = state.shutdown_cmd_tx.lock().await.subscribe(); + loop { + // Wait for next command / handle shutdown responses + let next_cmd = tokio::select! { + // Listen for file change notifications + file_event = file_events.recv() => { + if let Some(Ok(file_event)) = file_event { + Some(file_event) + } else { + None + } + }, + _ = shutdown_rx.recv() => { + log::info!("🛑 Shutting down file watch loop"); + + file_events.close(); + let mut watcher = state.file_watcher.lock().await; + if let Some(watcher) = watcher.as_mut() { + watcher.close(); + } + return; + } + }; + + let watcher = state.file_watcher.lock().await; + if let Some(events) = next_cmd { + // Received some events now process it through the watcher + if let Some(watcher) = watcher.as_ref() { + // reduce the events to only ones that should be processed + // by the system + let filtered_eventlist = watcher.filter_events(&events); + + // if we found any new .gitignore files add them for + // further processing. This is normal in the case + // you git clone in a watched directory, a later + // build step would require use to filter out ignored + // target folders + let ignore_files = &filtered_eventlist + .iter() + .filter_map(|evt| { + if utils::is_ignore_file(evt.path.as_path()) { + Some(evt.path.clone()) + } else { + None + } + }) + .collect::>(); + watcher.add_ignore_files(ignore_files); + watcher.process_changes(&filtered_eventlist).await; + + // Send chuncks of events to only watchers who care + for path_ref in &watcher.path_map { + let filtered_events = filtered_eventlist + .iter() + .filter_map(|evt| { + if evt.path.starts_with(path_ref.key()) { + Some(evt.clone()) + } else { + None + } + }) + .collect::>(); + + if !filtered_events.is_empty() { + let val = path_ref.value(); + notify_watchers(filtered_events, val).await; + } + } + } + } + + // Sleep a little at the end of each cmd + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + } +} + +/// General notification helper used to loop through the watchers and send the +/// events +async fn notify_watchers(events: Vec, watch_list: &Vec) { + log::debug!("Sending {:?} updates to file watchers", events.len()); + for watcher in watch_list { + let rslt = watcher.send_notify(events.clone()).await; + if let Err(error) = rslt { + log::error!("Error sending notify {:?}", error); + } + } +} + +impl SpyglassFileWatcher { + /// Creates a new filewatcher that will watch for file changes and send updates + /// to listeners + pub fn new(state: &AppState) -> Self { + let (tx, file_events) = tokio::sync::mpsc::channel(1); + + let watcher = + notify_debouncer_mini::new_debouncer(Duration::from_secs(5), None, move |res| { + futures::executor::block_on(async { + if !tx.is_closed() { + if let Err(err) = tx.send(res).await { + log::error!("fseventwatcher error: {}", err.to_string()); + } + } + }) + }) + .expect("Unable to watch lens directory"); + + let spy_watcher = SpyglassFileWatcher { + watcher: Arc::new(Mutex::new(watcher)), + path_map: DashMap::new(), + ignore_files: DashMap::new(), + db: state.db.clone(), + }; + + let _ = tokio::spawn(watch_events(state.clone(), file_events)); + + spy_watcher + } + + /// Helper method used to update the database with newly arrived changes + async fn process_changes(&self, events: &Vec) { + let mut inserts = Vec::new(); + let mut removals = Vec::new(); + + for event in events { + if event.path.exists() { + let mut model = processed_files::ActiveModel::new(); + model.file_path = Set(utils::path_buf_to_uri(&event.path)); + model.last_modified = Set(utils::last_modified_time(&event.path)); + inserts.push(model); + } else { + removals.push(utils::path_buf_to_uri(&event.path)); + } + } + + if !inserts.is_empty() { + if let Err(error) = processed_files::Entity::insert_many(inserts) + .on_conflict( + OnConflict::column(processed_files::Column::FilePath) + .update_column(processed_files::Column::LastModified) + .to_owned(), + ) + .exec(&self.db) + .await + { + log::error!("Error inserting updates {:?}", error); + } + } + + if !removals.is_empty() { + if let Err(error) = processed_files::Entity::delete_many() + .filter(processed_files::Column::Id.is_in(removals)) + .exec(&self.db) + .await + { + log::error!("Error processing deletes {:?}", error); + } + } + } + + /// Closes the watcher and associated resources + fn close(&mut self) { + for path_ref in self.path_map.iter() { + for path in path_ref.value() { + if let Some(sender) = &path.tx_channel { + drop(sender); + } + } + } + } + + /// Adds .gitignore files + fn add_ignore_files(&self, files: &Vec) { + for path in files { + if let Ok(patterns) = utils::patterns_from_file(path.as_path()) { + self.ignore_files.insert(path.to_owned(), patterns); + } + } + } + + /// Adds a single .gitignore file + fn add_ignore_file(&self, file: &Path) { + if let Ok(patterns) = utils::patterns_from_file(file) { + self.ignore_files.insert(file.to_path_buf(), patterns); + } + } + + /// filters the provided events and returns the list of events that should not + /// be ignored + fn filter_events(&self, events: &Vec) -> Vec { + events + .iter() + .filter_map(|evt| { + if evt.kind != DebouncedEventKind::AnyContinuous + && !self.is_ignored(evt.path.as_path()) + { + Some(evt.clone()) + } else { + None + } + }) + .collect::>() + } + + /// Checks if the path represents a hidden directory or + /// or file ignored by a .gitignore file + fn is_ignored(&self, path: &Path) -> bool { + if utils::is_in_hidden_dir(path) { + return true; + } + + // well does this work + for map_ref in &self.ignore_files { + let root = map_ref.key(); + let patterns = map_ref.value(); + if let Some(parent) = root.parent() { + if path.starts_with(parent) { + return patterns + .matched_path_or_any_parents(path, path.is_dir()) + .is_ignore(); + } + } + } + + false + } + + /// Sets up a watcher for the specified path. If two watchers are registered + /// for the same path only one file system watcher is registered and both + /// listeners are notified + pub async fn watch_path( + &mut self, + path: &Path, + extensions: Option>, + recursive: bool, + ) -> Receiver> { + let mut watch_path = WatchPath::new(path, extensions); + let rx = watch_path.build_rx(); + + let path_buf = path.to_path_buf(); + let new_path = !self.path_map.contains_key(&path_buf); + self.path_map + .entry(path_buf) + .or_insert(Vec::new()) + .push(watch_path); + + let mode = if recursive { + notify::RecursiveMode::Recursive + } else { + notify::RecursiveMode::NonRecursive + }; + + if new_path { + let watch_rslt = self.watcher.lock().await.watcher().watch(path, mode); + if let Err(error) = watch_rslt { + log::error!( + "Error attempting to watch path: {:?}, Error: {:?}", + path, + error + ); + } + } + rx + } + + /// Initializes the path by walking the entire tree. All changed, removed and new files + /// are returned as debounced events + pub async fn initialize_path(&mut self, path: &Path) -> Vec { + let mut debounced_events = Vec::new(); + let root_uri = utils::path_to_uri(path); + let files = DashMap::new(); + + // will not ignore hidden since we need to include .git files + let walker = WalkBuilder::new(path).hidden(false).build(); + for entry in walker.flatten() { + if !utils::is_in_hidden_dir(entry.path()) { + if utils::is_ignore_file(entry.path()) { + self.add_ignore_file(entry.path()); + } + + let uri = utils::path_to_uri(entry.path()); + let time = utils::last_modified_time_for_path(entry.path()); + files.insert(uri, time); + } + } + + let processed_files = processed_files::Entity::find() + .filter(processed_files::Column::FilePath.starts_with(root_uri.as_str())) + .all(&self.db) + .await; + let mut to_delete = Vec::new(); + let mut to_recrawl = Vec::new(); + + // Check all items already in the database if it is still in the file system + // then see if it has updated, if it is not then it has been deleted so + // add it to the deleted items. All remaining files in the map are new + if let Ok(processed) = processed_files { + for item in processed { + match files.remove(&item.file_path) { + Some((file_path, file_last_mod)) => { + if file_last_mod > item.last_modified { + debounced_events.push(DebouncedEvent { + path: utils::uri_to_path(&file_path).unwrap(), + kind: DebouncedEventKind::Any, + }); + to_recrawl.push((item.file_path, file_last_mod)); + } + } + None => { + debounced_events.push(DebouncedEvent { + path: utils::uri_to_path(&item.file_path).unwrap(), + kind: DebouncedEventKind::Any, + }); + to_delete.push(item.id) + } + } + } + } + + log::debug!( + "Added: {:?} Deleted: {:?} Updated: {:?}", + files.len(), + to_delete.len(), + to_recrawl.len() + ); + + if !to_delete.is_empty() { + if let Err(error) = processed_files::Entity::delete_many() + .filter(processed_files::Column::Id.is_in(to_delete)) + .exec(&self.db) + .await + { + log::error!("Error deleting processed files {:?}", error); + } + } + + if !files.is_empty() { + let models = files + .iter() + .map(|path_ref| { + debounced_events.push(DebouncedEvent { + path: utils::uri_to_path(&path_ref.key()).unwrap(), + kind: DebouncedEventKind::Any, + }); + + let mut active_model = processed_files::ActiveModel::new(); + active_model.file_path = Set(path_ref.key().clone()); + active_model.last_modified = Set(path_ref.value().clone()); + + active_model + }) + .collect::>(); + + if let Err(error) = processed_files::Entity::insert_many(models) + .exec(&self.db) + .await + { + log::error!("Error inserting additions {:?}", error); + } + } + + if !to_recrawl.is_empty() { + let updates = to_recrawl + .iter() + .map(|(uri, last_modified)| { + let mut active_model = processed_files::ActiveModel::new(); + active_model.file_path = Set(uri.clone()); + active_model.last_modified = Set(last_modified.clone()); + + active_model + }) + .collect::>(); + + if let Err(error) = processed_files::Entity::insert_many(updates) + .on_conflict( + OnConflict::column(processed_files::Column::FilePath) + .update_column(processed_files::Column::LastModified) + .to_owned(), + ) + .exec(&self.db) + .await + { + log::error!("Error updated recrawls {:?}", error); + } + } + log::debug!("Returning {:?} updates", files.len()); + + debounced_events + } +} + +/// Configures the file watcher with the user set directories +pub async fn configure_watcher(state: AppState) { + // temp use plugin configuration + let extension = utils::get_supported_file_extensions(&state); + let paths = utils::get_search_directories(&state); + let path_names = paths + .iter() + .map(|path| utils::path_buf_to_uri(path)) + .collect::>(); + + let mut watcher = state.file_watcher.lock().await; + if let Some(watcher) = watcher.as_mut() { + for path in paths { + log::debug!("Adding {:?} to watch list", path); + let updates = watcher.initialize_path(path.as_path()).await; + let rx1 = watcher.watch_path(path.as_path(), None, true).await; + + tokio::spawn(_process_messages( + state.clone(), + rx1, + updates, + extension.clone(), + )); + } + } else { + log::error!("Watcher is missing"); + } + + match processed_files::remove_unmatched_paths(&state.db, path_names).await { + Ok(removed) => { + let uri_list = removed + .iter() + .map(|model| model.file_path.clone()) + .collect::>(); + documents::delete_documents_by_uri(&state, uri_list).await; + } + Err(error) => log::error!( + "Error removing paths that are no longer being watched. {:?}", + error + ), + } + + // TODO remove the content from extensions that are no longer being processed, this should be the + // purview of the document handling and not the file handling since we cannot make the assumption + // here of what happens to files that do not meet the expected extension. +} + +/// Helper method use to process updates from a watched path +async fn _process_messages( + state: AppState, + mut rx: Receiver>, + initial: Vec, + extensions: HashSet, +) { + log::debug!("Processing {:?} initial updates.", initial.len()); + if let Err(error) = _process_file_and_dir(&state, initial, &extensions).await { + log::error!("Error processing initial files {:?}", error); + } + + loop { + let msg = rx.recv().await; + match msg { + Some(event) => { + if let Err(error) = _process_file_and_dir(&state, event, &extensions).await { + log::error!("Error processing updates {:?}", error); + } + } + None => { + log::info!("Message queue has closed. Stopping processing"); + break; + } + } + } +} + +/// Helper method used process all updated files and directories +async fn _process_file_and_dir( + state: &AppState, + events: Vec, + extensions: &HashSet, +) -> anyhow::Result<()> { + let mut enqueue_list = Vec::new(); + let mut delete_list = Vec::new(); + for event in events { + let path = event.path; + let uri = utils::path_to_uri(&path); + + if path.exists() { + if utils::is_windows_shortcut(path.as_path()) { + let location = utils::get_shortcut_destination(path.as_path()); + + if let Some(location) = location { + let ext = &location + .extension() + .and_then(|x| x.to_str()) + .map(|x| x.to_string()) + .unwrap_or_default(); + + // If the shortcut points to a file we can process then + // process the file instead of the shortcut + if extensions.contains(ext) { + let file_uri = utils::path_to_uri(&location); + enqueue_list.push(file_uri); + } + } + } + + enqueue_list.push(uri); + } else { + delete_list.push(uri); + } + } + + if !enqueue_list.is_empty() { + let tags = vec![(TagType::Lens, String::from("filesystem"))]; + let enqueue_settings = EnqueueSettings { + crawl_type: CrawlType::Normal, + is_recrawl: true, + tags: tags, + force_allow: true, + }; + if let Err(error) = + crawl_queue::enqueue_local_files(&state.db, &enqueue_list, &enqueue_settings, None) + .await + { + log::error!("Error adding to crawl queue {:?}", error); + } + } + + if !delete_list.is_empty() { + documents::delete_documents_by_uri(&state, delete_list).await; + } + + Ok(()) +} diff --git a/crates/spyglass/src/filesystem/utils.rs b/crates/spyglass/src/filesystem/utils.rs new file mode 100644 index 000000000..fbbf7bb78 --- /dev/null +++ b/crates/spyglass/src/filesystem/utils.rs @@ -0,0 +1,239 @@ +extern crate glob; +use chrono::{DateTime, NaiveDateTime, Utc}; +use ignore::{gitignore::Gitignore, Error}; +use std::{ + collections::HashSet, + ffi::OsStr, + path::{Path, PathBuf}, + time::UNIX_EPOCH, +}; +use url::Url; + +use crate::state::AppState; + +// Create a file URI +pub fn path_to_uri(path: &Path) -> String { + let path_str = path.display().to_string(); + // Eventually this will be away to keep track of multiple devices and searching across + // them. + let host = ""; + + let mut new_url = Url::parse("file://").expect("Base URI"); + let _ = new_url.set_host(Some(host)); + // Fixes issues handling windows drive letters + let path_str = path_str.replace(':', "%3A"); + // Fixes an issue where DirEntry adds too many escapes. + let path_str = path_str.replace("\\\\", "\\"); + new_url.set_path(&path_str); + new_url.to_string() +} + +// Create a file URI +pub fn path_buf_to_uri(path: &PathBuf) -> String { + let path_str = path.display().to_string(); + // Eventually this will be away to keep track of multiple devices and searching across + // them. + let host = ""; + + let mut new_url = Url::parse("file://").expect("Base URI"); + let _ = new_url.set_host(Some(host)); + // Fixes issues handling windows drive letters + let path_str = path_str.replace(':', "%3A"); + // Fixes an issue where DirEntry adds too many escapes. + let path_str = path_str.replace("\\\\", "\\"); + new_url.set_path(&path_str); + new_url.to_string() +} + +/// Converts a uri to a valid path buf +pub fn uri_to_path(uri: &String) -> anyhow::Result { + match Url::parse(uri) { + Ok(url) => match url.to_file_path() { + Ok(path) => Ok(path), + Err(_) => Err(anyhow::format_err!("Unable to access file path")), + }, + Err(error) => Err(anyhow::Error::from(error)), + } +} + +/// Identifies if the provided path represents a windows shortcut +pub fn is_windows_shortcut(path: &Path) -> bool { + let ext = &path + .extension() + .and_then(|x| x.to_str()) + .map(|x| x.to_string()) + .unwrap_or_default(); + ext.eq("lnk") +} + +/// Helper method used to get the destination for a windows shortcut. Note that +/// this method currently only checks the local base path and local base path unicode +pub fn get_shortcut_destination(path: &Path) -> Option { + let shortcut = lnk::ShellLink::open(path); + if let Ok(shortcut) = shortcut { + if let Some(link_info) = &shortcut.link_info() { + if link_info.local_base_path().is_some() { + return Some(PathBuf::from(link_info.local_base_path().clone().unwrap())); + } else if link_info.local_base_path_unicode().is_some() { + return Some(PathBuf::from( + link_info.local_base_path_unicode().clone().unwrap(), + )); + } + } + } + None +} + +/// Accessor for the last modified time for a file. If the last modified +/// time is not available now is returned +pub fn last_modified_time_for_path(path: &Path) -> DateTime { + if let Ok(metadata) = path.metadata() { + if let Ok(modified) = metadata.modified() { + let since_the_epoch = modified + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + + if let Some(time) = + NaiveDateTime::from_timestamp_millis(since_the_epoch.as_millis() as i64) + { + return DateTime::::from_utc(time, Utc); + } else { + Utc::now() + } + } else { + Utc::now() + } + } else { + Utc::now() + } +} + +/// Accessor for the last modified time for a file. If the last modified +/// time is not available now is returned +pub fn last_modified_time(path: &PathBuf) -> DateTime { + if let Ok(metadata) = path.metadata() { + if let Ok(modified) = metadata.modified() { + let since_the_epoch = modified + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + + if let Some(time) = + NaiveDateTime::from_timestamp_millis(since_the_epoch.as_millis() as i64) + { + return DateTime::::from_utc(time, Utc); + } else { + Utc::now() + } + } else { + Utc::now() + } + } else { + Utc::now() + } +} + +/// Helper method used to access the configured file search directories from +/// user settings. +pub fn get_search_directories(state: &AppState) -> Vec { + let plugin_settings = state.user_settings.plugin_settings.clone(); + let local_file_settings = plugin_settings.get("local-file-importer").unwrap(); + let dir_list = local_file_settings.get("FOLDERS_LIST"); + + let directories = + if let Ok(dirs) = serde_json::from_str::>(dir_list.unwrap().as_str()) { + dirs + } else { + HashSet::new() + }; + + directories + .iter() + .map(|str| PathBuf::from(str)) + .collect::>() +} + +/// Helper method used to access the configured file extensions from +/// user settings. +pub fn get_supported_file_extensions(state: &AppState) -> HashSet { + let plugin_settings = state.user_settings.plugin_settings.clone(); + let local_file_settings = plugin_settings.get("local-file-importer").unwrap(); + let ext_list = local_file_settings.get("EXTS_LIST"); + + if let Ok(exts) = serde_json::from_str::>(ext_list.unwrap().as_str()) { + exts + } else { + HashSet::new() + } +} + +/// Helper method used to identify if the provided path represents a gitignore file +pub fn is_ignore_file(path: &Path) -> bool { + if let Some(file_name) = path.file_name() { + return file_name.eq(OsStr::new(".gitignore")); + } + + return false; +} + +/// Helper method used to convert a gitignore file into a processed gitignore object +pub fn patterns_from_file(path: &Path) -> Result { + let mut builder = ignore::gitignore::GitignoreBuilder::new(path.parent().unwrap()); + builder.add(path); + builder.build() +} + +/// Helper method used to identify if the provided path is in a "hidden" directory. +/// In this case a "hidden" directory is any directory that starts with "." Example: +/// .git +/// .ssh +pub fn is_in_hidden_dir(path: &Path) -> bool { + path.ancestors() + .find(|ancestor| { + if ancestor.is_dir() { + if let Some(name) = ancestor.file_name() { + if let Some(name_str) = name.to_str() { + if name_str.starts_with(".") { + return true; + } + } + } + } + false + }) + .is_some() +} + +#[cfg(test)] +mod test { + use std::path::Path; + + use super::path_to_uri; + use url::Url; + + #[test] + fn test_path_to_uri() { + #[cfg(target_os = "windows")] + let test_folder = Path::new("C:\\tmp\\path_to_uri"); + + #[cfg(not(target_os = "windows"))] + let test_folder = Path::new("/tmp/path_to_uri"); + + std::fs::create_dir_all(test_folder).expect("Unable to create test dir"); + + let test_path = test_folder.join("test.txt"); + let uri = path_to_uri(&test_path.to_path_buf()); + + #[cfg(target_os = "windows")] + assert_eq!(uri, "file://localhost/C%3A/tmp/path_to_uri/test.txt"); + #[cfg(not(target_os = "windows"))] + assert_eq!(uri, "file://localhost/tmp/path_to_uri/test.txt"); + + let url = Url::parse(&uri).unwrap(); + let file_path = url.to_file_path().unwrap(); + assert_eq!(file_path, test_path); + + if test_folder.exists() { + std::fs::remove_dir_all(test_folder).expect("Unable to clean up test folder"); + } + } +} diff --git a/crates/spyglass/src/lib.rs b/crates/spyglass/src/lib.rs index 02d01de75..e3cbb3761 100644 --- a/crates/spyglass/src/lib.rs +++ b/crates/spyglass/src/lib.rs @@ -3,6 +3,8 @@ extern crate html5ever; pub mod connection; pub mod crawler; +pub mod documents; +pub mod filesystem; pub mod oauth; pub mod parser; pub mod pipeline; diff --git a/crates/spyglass/src/main.rs b/crates/spyglass/src/main.rs index 7d6d06dba..b1596b368 100644 --- a/crates/spyglass/src/main.rs +++ b/crates/spyglass/src/main.rs @@ -233,6 +233,12 @@ async fn start_backend(state: AppState, config: Config) { plugin_cmd_rx, )); + let watcher = libspyglass::filesystem::SpyglassFileWatcher::new(&state); + { + state.file_watcher.lock().await.replace(watcher); + } + tokio::spawn(libspyglass::filesystem::configure_watcher(state.clone())); + // Gracefully handle shutdowns match signal::ctrl_c().await { Ok(()) => { diff --git a/crates/spyglass/src/state.rs b/crates/spyglass/src/state.rs index 9124387c6..f80344136 100644 --- a/crates/spyglass/src/state.rs +++ b/crates/spyglass/src/state.rs @@ -7,6 +7,7 @@ use tokio::sync::mpsc::error::SendError; use tokio::sync::Mutex; use tokio::sync::{broadcast, mpsc}; +use crate::filesystem::SpyglassFileWatcher; use crate::task::AppShutdown; use crate::{ pipeline::PipelineCommand, @@ -26,6 +27,7 @@ pub struct AppState { pub user_settings: UserSettings, pub index: Searcher, pub metrics: Metrics, + pub config: Config, // Task scheduler command/control pub manager_cmd_tx: Arc>>>, pub shutdown_cmd_tx: Arc>>, @@ -36,6 +38,7 @@ pub struct AppState { pub plugin_manager: Arc>, // Pipeline command/control pub pipeline_cmd_tx: Arc>>>, + pub file_watcher: Arc>>, } impl AppState { @@ -78,6 +81,7 @@ impl AppState { &Config::machine_identifier(), config.user_settings.disable_telemetry, ), + config: config.clone(), lenses: Arc::new(lenses), pipelines: Arc::new(pipelines), index, @@ -87,6 +91,7 @@ impl AppState { pipeline_cmd_tx: Arc::new(Mutex::new(None)), plugin_manager: Arc::new(Mutex::new(PluginManager::new())), manager_cmd_tx: Arc::new(Mutex::new(None)), + file_watcher: Arc::new(Mutex::new(None)), } } @@ -153,12 +158,14 @@ impl AppStateBuilder { &Config::machine_identifier(), user_settings.disable_telemetry, ), + config: Config::new(), pause_cmd_tx: Arc::new(Mutex::new(None)), pipeline_cmd_tx: Arc::new(Mutex::new(None)), pipelines: Arc::new(pipelines), plugin_cmd_tx: Arc::new(Mutex::new(None)), plugin_manager: Arc::new(Mutex::new(PluginManager::new())), shutdown_cmd_tx: Arc::new(Mutex::new(shutdown_tx)), + file_watcher: Arc::new(Mutex::new(None)), user_settings, } } diff --git a/crates/spyglass/src/task.rs b/crates/spyglass/src/task.rs index 4f770d81d..1c1b53ad7 100644 --- a/crates/spyglass/src/task.rs +++ b/crates/spyglass/src/task.rs @@ -124,7 +124,7 @@ pub async fn manager_task( // first tick always completes immediately. queue_check_interval.tick().await; } else { - queue_check_interval = tokio::time::interval(Duration::from_millis(100)); + queue_check_interval = tokio::time::interval(Duration::from_millis(50)); // first tick always completes immediately. queue_check_interval.tick().await; } diff --git a/crates/spyglass/src/task/manager.rs b/crates/spyglass/src/task/manager.rs index 9b26e3818..272fef392 100644 --- a/crates/spyglass/src/task/manager.rs +++ b/crates/spyglass/src/task/manager.rs @@ -8,6 +8,7 @@ use crate::state::AppState; // Check for new jobs in the crawl queue and add them to the worker queue. #[tracing::instrument(skip(state, queue))] pub async fn check_for_jobs(state: &AppState, queue: &mpsc::Sender) -> bool { + let mut started_task = None; // Do we have any crawl tasks? match crawl_queue::dequeue(&state.db, state.user_settings.clone()).await { Ok(Some(task)) => { @@ -23,7 +24,7 @@ pub async fn check_for_jobs(state: &AppState, queue: &mpsc::Sender { // Send to worker @@ -31,17 +32,55 @@ pub async fn check_for_jobs(state: &AppState, queue: &mpsc::Sender { log::error!("Unable to dequeue jobs: {}", err.to_string()); - return false; + started_task = Some(false); } _ => {} } + // Do we have any crawl tasks? + match crawl_queue::dequeue_files(&state.db, state.user_settings.clone()).await { + Ok(Some(task)) => { + match &task.pipeline { + Some(pipeline) => { + if let Some(pipeline_tx) = state.pipeline_cmd_tx.lock().await.as_mut() { + log::debug!("Sending crawl task to pipeline"); + let cmd = PipelineCommand::ProcessUrl( + pipeline.clone(), + CrawlTask { id: task.id }, + ); + if let Err(err) = pipeline_tx.send(cmd).await { + log::error!("Unable to send crawl task to pipeline {:?}", err); + } + } + started_task = Some(true); + } + None => { + // Send to worker + let cmd = WorkerCommand::Crawl { id: task.id }; + if queue.send(cmd).await.is_err() { + log::error!("unable to send command to worker"); + } + started_task = Some(true); + } + } + } + Err(err) => { + log::error!("Unable to dequeue jobs: {}", err.to_string()); + started_task = Some(false); + } + _ => {} + } + + if let Some(ret) = started_task { + return ret; + } + // No crawl tasks, check for recrawl tasks match crawl_queue::dequeue_recrawl(&state.db, &state.user_settings).await { Ok(Some(task)) => { From f6881c349afd549601655fd15d4c5c321c49b827 Mon Sep 17 00:00:00 2001 From: Joel Bredeson Date: Fri, 27 Jan 2023 19:49:06 -0800 Subject: [PATCH 2/9] Update file watcher to watch for enable and disable --- crates/entities/src/models/crawl_queue.rs | 6 +- crates/spyglass/src/filesystem/mod.rs | 107 ++++++++++++++-------- crates/spyglass/src/filesystem/utils.rs | 69 +++++++------- plugins/local-file-indexer/src/main.rs | 57 +----------- 4 files changed, 106 insertions(+), 133 deletions(-) diff --git a/crates/entities/src/models/crawl_queue.rs b/crates/entities/src/models/crawl_queue.rs index 3a6c824e5..919b0e589 100644 --- a/crates/entities/src/models/crawl_queue.rs +++ b/crates/entities/src/models/crawl_queue.rs @@ -303,12 +303,10 @@ pub async fn dequeue( Some(task) } else { // Otherwise, grab a URL off the stack & send it back. - let val = Entity::find() + Entity::find() .from_raw_sql(gen_dequeue_sql(user_settings)) .one(db) - .await?; - log::error!("Got one {:?}", val); - val + .await? } }; diff --git a/crates/spyglass/src/filesystem/mod.rs b/crates/spyglass/src/filesystem/mod.rs index 9dbc942c9..388ed9855 100644 --- a/crates/spyglass/src/filesystem/mod.rs +++ b/crates/spyglass/src/filesystem/mod.rs @@ -2,6 +2,7 @@ use dashmap::DashMap; use entities::models::crawl_queue::{self, CrawlType, EnqueueSettings}; use entities::models::processed_files; use entities::models::tag::TagType; +use entities::models::lens; use entities::sea_orm::entity::prelude::*; use entities::sea_orm::DatabaseConnection; use ignore::gitignore::Gitignore; @@ -519,48 +520,76 @@ impl SpyglassFileWatcher { /// Configures the file watcher with the user set directories pub async fn configure_watcher(state: AppState) { // temp use plugin configuration - let extension = utils::get_supported_file_extensions(&state); - let paths = utils::get_search_directories(&state); - let path_names = paths - .iter() - .map(|path| utils::path_buf_to_uri(path)) - .collect::>(); - - let mut watcher = state.file_watcher.lock().await; - if let Some(watcher) = watcher.as_mut() { - for path in paths { - log::debug!("Adding {:?} to watch list", path); - let updates = watcher.initialize_path(path.as_path()).await; - let rx1 = watcher.watch_path(path.as_path(), None, true).await; - - tokio::spawn(_process_messages( - state.clone(), - rx1, - updates, - extension.clone(), - )); - } - } else { - log::error!("Watcher is missing"); - } + if let Ok(Some(lens)) = lens::find_by_name("local-file-importer", &state.db).await { + if lens.is_enabled { + log::info!("📂 Loading local file watcher"); - match processed_files::remove_unmatched_paths(&state.db, path_names).await { - Ok(removed) => { - let uri_list = removed + let extension = utils::get_supported_file_extensions(&state); + let paths = utils::get_search_directories(&state); + let path_names = paths .iter() - .map(|model| model.file_path.clone()) + .map(|path| utils::path_buf_to_uri(path)) .collect::>(); - documents::delete_documents_by_uri(&state, uri_list).await; - } - Err(error) => log::error!( - "Error removing paths that are no longer being watched. {:?}", - error - ), - } + + let mut watcher = state.file_watcher.lock().await; + if let Some(watcher) = watcher.as_mut() { + for path in paths { + log::debug!("Adding {:?} to watch list", path); + let updates = watcher.initialize_path(path.as_path()).await; + let rx1 = watcher.watch_path(path.as_path(), None, true).await; + + tokio::spawn(_process_messages( + state.clone(), + rx1, + updates, + extension.clone(), + )); + } + } else { + log::error!("Watcher is missing"); + } + + match processed_files::remove_unmatched_paths(&state.db, path_names).await { + Ok(removed) => { + let uri_list = removed + .iter() + .map(|model| model.file_path.clone()) + .collect::>(); + documents::delete_documents_by_uri(&state, uri_list).await; + } + Err(error) => log::error!( + "Error removing paths that are no longer being watched. {:?}", + error + ), + } + + // TODO remove the content from extensions that are no longer being processed, this should be the + // purview of the document handling and not the file handling since we cannot make the assumption + // here of what happens to files that do not meet the expected extension. + - // TODO remove the content from extensions that are no longer being processed, this should be the - // purview of the document handling and not the file handling since we cannot make the assumption - // here of what happens to files that do not meet the expected extension. + // At the moment triggering a recrawl will work the best + + } else { + log::info!("❌ Local file watcher is disabled"); + + match processed_files::remove_unmatched_paths(&state.db, Vec::new()).await { + Ok(removed) => { + let uri_list = removed + .iter() + .map(|model| model.file_path.clone()) + .collect::>(); + documents::delete_documents_by_uri(&state, uri_list).await; + } + Err(error) => log::error!( + "Error removing paths that are no longer being watched. {:?}", + error + ), + } + } + } else { + log::info!("❌ Local file watcher not installed"); + } } /// Helper method use to process updates from a watched path @@ -630,7 +659,7 @@ async fn _process_file_and_dir( } if !enqueue_list.is_empty() { - let tags = vec![(TagType::Lens, String::from("filesystem"))]; + let tags = vec![(TagType::Lens, String::from("files"))]; let enqueue_settings = EnqueueSettings { crawl_type: CrawlType::Normal, is_recrawl: true, diff --git a/crates/spyglass/src/filesystem/utils.rs b/crates/spyglass/src/filesystem/utils.rs index fbbf7bb78..083dccb7f 100644 --- a/crates/spyglass/src/filesystem/utils.rs +++ b/crates/spyglass/src/filesystem/utils.rs @@ -13,24 +13,15 @@ use crate::state::AppState; // Create a file URI pub fn path_to_uri(path: &Path) -> String { - let path_str = path.display().to_string(); - // Eventually this will be away to keep track of multiple devices and searching across - // them. - let host = ""; - - let mut new_url = Url::parse("file://").expect("Base URI"); - let _ = new_url.set_host(Some(host)); - // Fixes issues handling windows drive letters - let path_str = path_str.replace(':', "%3A"); - // Fixes an issue where DirEntry adds too many escapes. - let path_str = path_str.replace("\\\\", "\\"); - new_url.set_path(&path_str); - new_url.to_string() + path_string_to_uri(path.display().to_string()) } // Create a file URI pub fn path_buf_to_uri(path: &PathBuf) -> String { - let path_str = path.display().to_string(); + path_string_to_uri(path.display().to_string()) +} + +pub fn path_string_to_uri(path_str: String) -> String { // Eventually this will be away to keep track of multiple devices and searching across // them. let host = ""; @@ -40,7 +31,11 @@ pub fn path_buf_to_uri(path: &PathBuf) -> String { // Fixes issues handling windows drive letters let path_str = path_str.replace(':', "%3A"); // Fixes an issue where DirEntry adds too many escapes. - let path_str = path_str.replace("\\\\", "\\"); + let path_str = path_str.replace(r#"\\\\"#, r#"\"#); + let path_str = path_str.replace(r#"\\"#, r#"\"#); + + log::error!("Path string {:?}", path_str); + new_url.set_path(&path_str); new_url.to_string() } @@ -136,34 +131,38 @@ pub fn last_modified_time(path: &PathBuf) -> DateTime { /// user settings. pub fn get_search_directories(state: &AppState) -> Vec { let plugin_settings = state.user_settings.plugin_settings.clone(); - let local_file_settings = plugin_settings.get("local-file-importer").unwrap(); - let dir_list = local_file_settings.get("FOLDERS_LIST"); - - let directories = - if let Ok(dirs) = serde_json::from_str::>(dir_list.unwrap().as_str()) { - dirs - } else { - HashSet::new() - }; + if let Some(local_file_settings) = plugin_settings.get("local-file-importer") { + let dir_list = local_file_settings.get("FOLDERS_LIST"); - directories - .iter() - .map(|str| PathBuf::from(str)) - .collect::>() + let directories = + if let Ok(dirs) = serde_json::from_str::>(dir_list.unwrap().as_str()) { + dirs + } else { + HashSet::new() + }; + + directories + .iter() + .map(|str| PathBuf::from(str)) + .collect::>() + } else { + Vec::new() + } + } /// Helper method used to access the configured file extensions from /// user settings. pub fn get_supported_file_extensions(state: &AppState) -> HashSet { let plugin_settings = state.user_settings.plugin_settings.clone(); - let local_file_settings = plugin_settings.get("local-file-importer").unwrap(); - let ext_list = local_file_settings.get("EXTS_LIST"); - - if let Ok(exts) = serde_json::from_str::>(ext_list.unwrap().as_str()) { - exts - } else { - HashSet::new() + + if let Some(local_file_settings) = plugin_settings.get("local-file-importer") { + let ext_list = local_file_settings.get("EXTS_LIST"); + if let Ok(exts) = serde_json::from_str::>(ext_list.unwrap().as_str()) { + return exts; + } } + HashSet::new() } /// Helper method used to identify if the provided path represents a gitignore file diff --git a/plugins/local-file-indexer/src/main.rs b/plugins/local-file-indexer/src/main.rs index 0bec0b526..8dee2dff8 100644 --- a/plugins/local-file-indexer/src/main.rs +++ b/plugins/local-file-indexer/src/main.rs @@ -25,61 +25,8 @@ register_plugin!(Plugin); impl SpyglassPlugin for Plugin { fn load(&mut self) { - // List of supported file types - let default_exts = - HashSet::from_iter(vec!["md".to_string(), "txt".to_string()].into_iter()); - self.extensions = if let Ok(blob) = std::env::var(EXTS_LIST_ENV) { - if let Ok(exts) = serde_json::from_str(&blob) { - exts - } else { - default_exts - } - } else { - default_exts - }; - - // When paths were last synced - self.last_synced = if let Ok(blob) = std::fs::read_to_string(PLUGIN_DATA) { - serde_json::from_str::(&blob).map_or(Default::default(), |x| x) - } else { - Default::default() - }; - - let paths = if let Ok(blob) = std::env::var(FOLDERS_LIST_ENV) { - serde_json::from_str::>(&blob).map_or(Vec::new(), |x| x) - } else { - Vec::new() - }; - - for path in paths.iter().map(|path| Path::new(&path).to_path_buf()) { - let now = Utc::now(); - - let last_processed_time = self - .last_synced - .path_to_times - .entry(path.to_path_buf()) - .or_default(); - - let diff = now - *last_processed_time; - if diff.num_days() > 1 { - if let Err(e) = walk_and_enqueue_dir(path.to_path_buf(), &self.extensions) { - log(format!("Unable to process dir: {e}")); - } else { - *last_processed_time = now; - } - } - - // List to notifications - subscribe(PluginSubscription::WatchDirectory { - path: path.to_path_buf(), - recurse: true, - }); - } - - // Save list of processed paths to data dir - if let Ok(blob) = serde_json::to_string_pretty(&self.last_synced) { - let _ = std::fs::write(PLUGIN_DATA, blob); - } + // Noop, now handled internally. Plugin can be removed when settings + // are converted to core } fn update(&mut self, event: PluginEvent) { From b50394415e9f595fa9f232c6517362f537ffc577 Mon Sep 17 00:00:00 2001 From: Joel Bredeson Date: Sun, 29 Jan 2023 10:11:17 -0800 Subject: [PATCH 3/9] Update watcher to dynamically follow local filesystem plugin state Enabling plugin will enable the filesystem and start crawling, disabling will delete all filesystem documents. --- crates/spyglass/src/filesystem/mod.rs | 106 +++++++++++++++--------- crates/spyglass/src/filesystem/utils.rs | 9 +- crates/spyglass/src/plugin/mod.rs | 15 +++- 3 files changed, 83 insertions(+), 47 deletions(-) diff --git a/crates/spyglass/src/filesystem/mod.rs b/crates/spyglass/src/filesystem/mod.rs index 388ed9855..84c69a07f 100644 --- a/crates/spyglass/src/filesystem/mod.rs +++ b/crates/spyglass/src/filesystem/mod.rs @@ -1,8 +1,8 @@ use dashmap::DashMap; use entities::models::crawl_queue::{self, CrawlType, EnqueueSettings}; +use entities::models::lens; use entities::models::processed_files; use entities::models::tag::TagType; -use entities::models::lens; use entities::sea_orm::entity::prelude::*; use entities::sea_orm::DatabaseConnection; use ignore::gitignore::Gitignore; @@ -50,7 +50,7 @@ pub struct SpyglassFileWatcher { /// be notified of system changes via the send and receiver #[derive(Debug)] pub struct WatchPath { - _path: PathBuf, + path: PathBuf, _uuid: String, extensions: Option>, tx_channel: Option>>, @@ -63,7 +63,7 @@ impl WatchPath { let uuid = Uuid::new_v4().as_hyphenated().to_string(); WatchPath { - _path: path.to_path_buf(), + path: path.to_path_buf(), _uuid: uuid.clone(), extensions, tx_channel: None::>>, @@ -145,7 +145,7 @@ async fn watch_events( file_events.close(); let mut watcher = state.file_watcher.lock().await; if let Some(watcher) = watcher.as_mut() { - watcher.close(); + watcher.close().await; } return; } @@ -286,15 +286,35 @@ impl SpyglassFileWatcher { } } + async fn remove_path(&mut self, path: &Path) { + if let Some((key, watchers)) = self.path_map.remove(&path.to_path_buf()) { + let _ = self.watcher.lock().await.watcher().unwatch(path); + for path in watchers { + if let Some(sender) = &path.tx_channel { + drop(sender); + } + } + } + } + /// Closes the watcher and associated resources - fn close(&mut self) { + async fn close(&mut self) { + self.ignore_files.clear(); + for path_ref in self.path_map.iter() { for path in path_ref.value() { + let _ = self + .watcher + .lock() + .await + .watcher() + .unwatch(path.path.as_path()); if let Some(sender) = &path.tx_channel { drop(sender); } } } + self.path_map.clear(); } /// Adds .gitignore files @@ -313,6 +333,10 @@ impl SpyglassFileWatcher { } } + fn is_path_initialized(&self, file: &Path) -> bool { + self.path_map.contains_key(&file.to_path_buf()) + } + /// filters the provided events and returns the list of events that should not /// be ignored fn filter_events(&self, events: &Vec) -> Vec { @@ -530,25 +554,28 @@ pub async fn configure_watcher(state: AppState) { .iter() .map(|path| utils::path_buf_to_uri(path)) .collect::>(); - + let mut watcher = state.file_watcher.lock().await; if let Some(watcher) = watcher.as_mut() { for path in paths { + if !watcher.is_path_initialized(path.as_path()) { + log::debug!("Adding {:?} to watch list", path); - let updates = watcher.initialize_path(path.as_path()).await; - let rx1 = watcher.watch_path(path.as_path(), None, true).await; - - tokio::spawn(_process_messages( - state.clone(), - rx1, - updates, - extension.clone(), - )); + let updates = watcher.initialize_path(path.as_path()).await; + let rx1 = watcher.watch_path(path.as_path(), None, true).await; + + tokio::spawn(_process_messages( + state.clone(), + rx1, + updates, + extension.clone(), + )); + } } } else { log::error!("Watcher is missing"); } - + match processed_files::remove_unmatched_paths(&state.db, path_names).await { Ok(removed) => { let uri_list = removed @@ -563,32 +590,35 @@ pub async fn configure_watcher(state: AppState) { ), } - // TODO remove the content from extensions that are no longer being processed, this should be the - // purview of the document handling and not the file handling since we cannot make the assumption - // here of what happens to files that do not meet the expected extension. + // TODO remove the content from extensions that are no longer being processed, this should be the + // purview of the document handling and not the file handling since we cannot make the assumption + // here of what happens to files that do not meet the expected extension. + // At the moment triggering a recrawl will work the best + } else { + log::info!("❌ Local file watcher is disabled"); - // At the moment triggering a recrawl will work the best + let mut watcher = state.file_watcher.lock().await; + if let Some(watcher) = watcher.as_mut() { + watcher.close().await; + } - } else { - log::info!("❌ Local file watcher is disabled"); - - match processed_files::remove_unmatched_paths(&state.db, Vec::new()).await { - Ok(removed) => { - let uri_list = removed - .iter() - .map(|model| model.file_path.clone()) - .collect::>(); - documents::delete_documents_by_uri(&state, uri_list).await; - } - Err(error) => log::error!( - "Error removing paths that are no longer being watched. {:?}", - error - ), - } + match processed_files::remove_unmatched_paths(&state.db, Vec::new()).await { + Ok(removed) => { + let uri_list = removed + .iter() + .map(|model| model.file_path.clone()) + .collect::>(); + documents::delete_documents_by_uri(&state, uri_list).await; } - } else { - log::info!("❌ Local file watcher not installed"); + Err(error) => log::error!( + "Error removing paths that are no longer being watched. {:?}", + error + ), + } + } + } else { + log::info!("❌ Local file watcher not installed"); } } diff --git a/crates/spyglass/src/filesystem/utils.rs b/crates/spyglass/src/filesystem/utils.rs index 083dccb7f..14efa6e9f 100644 --- a/crates/spyglass/src/filesystem/utils.rs +++ b/crates/spyglass/src/filesystem/utils.rs @@ -34,8 +34,6 @@ pub fn path_string_to_uri(path_str: String) -> String { let path_str = path_str.replace(r#"\\\\"#, r#"\"#); let path_str = path_str.replace(r#"\\"#, r#"\"#); - log::error!("Path string {:?}", path_str); - new_url.set_path(&path_str); new_url.to_string() } @@ -140,7 +138,7 @@ pub fn get_search_directories(state: &AppState) -> Vec { } else { HashSet::new() }; - + directories .iter() .map(|str| PathBuf::from(str)) @@ -148,19 +146,18 @@ pub fn get_search_directories(state: &AppState) -> Vec { } else { Vec::new() } - } /// Helper method used to access the configured file extensions from /// user settings. pub fn get_supported_file_extensions(state: &AppState) -> HashSet { let plugin_settings = state.user_settings.plugin_settings.clone(); - + if let Some(local_file_settings) = plugin_settings.get("local-file-importer") { let ext_list = local_file_settings.get("EXTS_LIST"); if let Ok(exts) = serde_json::from_str::>(ext_list.unwrap().as_str()) { return exts; - } + } } HashSet::new() } diff --git a/crates/spyglass/src/plugin/mod.rs b/crates/spyglass/src/plugin/mod.rs index 0f4f655d4..5b6017f44 100644 --- a/crates/spyglass/src/plugin/mod.rs +++ b/crates/spyglass/src/plugin/mod.rs @@ -21,6 +21,7 @@ use shared::config::{Config, LensConfig}; use shared::plugin::{PluginConfig, PluginType}; use spyglass_plugin::{consts::env, PluginEvent, PluginSubscription}; +use crate::filesystem; use crate::state::AppState; mod exports; @@ -219,7 +220,7 @@ pub async fn plugin_event_loop( let mut disabled = Vec::new(); let mut manager = state.plugin_manager.lock().await; - if let Some(plugin) = manager.find_by_name(plugin_name) { + if let Some(plugin) = manager.find_by_name(plugin_name.clone()) { if let Some(mut instance) = manager.plugins.get_mut(&plugin.id) { instance.config.is_enabled = false; disabled.push(plugin.id); @@ -228,12 +229,17 @@ pub async fn plugin_event_loop( disabled.iter().for_each(|pid| { manager.check_update_subs.remove(pid); - }) + }); + + if plugin_name.eq("local-file-importer") { + tokio::spawn(filesystem::configure_watcher(state.clone())); + } } Some(PluginCommand::EnablePlugin(plugin_name)) => { log::info!("enabling plugin <{}>", plugin_name); + let manager = state.plugin_manager.lock().await; - if let Some(plugin) = manager.find_by_name(plugin_name) { + if let Some(plugin) = manager.find_by_name(plugin_name.clone()) { if let Some(mut instance) = manager.plugins.get_mut(&plugin.id) { instance.config.is_enabled = true; // Re-initialize plugin @@ -242,6 +248,9 @@ pub async fn plugin_event_loop( .await; } } + if plugin_name.eq("local-file-importer") { + tokio::spawn(filesystem::configure_watcher(state.clone())); + } } Some(PluginCommand::HandleUpdate { plugin_id, event }) => { let manager = state.plugin_manager.lock().await; From 56f5f4a903f6314235178566b39494cae6155a71 Mon Sep 17 00:00:00 2001 From: Joel Bredeson Date: Mon, 30 Jan 2023 14:42:50 -0800 Subject: [PATCH 4/9] Update document processing to include tags on bulk update / refactor --- crates/entities/src/models/crawl_queue.rs | 7 +- .../entities/src/models/indexed_document.rs | 30 ++ crates/entities/src/models/processed_files.rs | 7 +- crates/entities/src/models/tag.rs | 42 +- .../src/m20230126_000001_create_file_table.rs | 17 +- crates/spyglass/src/crawler/mod.rs | 79 +--- crates/spyglass/src/documents/mod.rs | 406 +++++++++++++++++- crates/spyglass/src/filesystem/mod.rs | 146 +++++-- crates/spyglass/src/filesystem/utils.rs | 41 +- .../spyglass/src/pipeline/cache_pipeline.rs | 174 +------- crates/spyglass/src/plugin/mod.rs | 2 +- 11 files changed, 628 insertions(+), 323 deletions(-) diff --git a/crates/entities/src/models/crawl_queue.rs b/crates/entities/src/models/crawl_queue.rs index 919b0e589..8679f0ee0 100644 --- a/crates/entities/src/models/crawl_queue.rs +++ b/crates/entities/src/models/crawl_queue.rs @@ -525,7 +525,7 @@ pub async fn enqueue_local_files( OnConflict::column(Column::Url).do_nothing().to_owned() }; - let insert = Entity::insert_many(model) + let _insert = Entity::insert_many(model) .on_conflict(on_conflict) .exec(db) .await?; @@ -536,7 +536,7 @@ pub async fn enqueue_local_files( let ids = inserted_rows.iter().map(|row| row.id).collect::>(); let tag_rslt = insert_tags_many(&inserted_rows, db, &overrides.tags).await; - if let Ok(_) = tag_rslt { + if tag_rslt.is_ok() { let query = Query::update() .table(Entity.table_ref()) .values([(Column::Status, CrawlStatus::Queued.into())]) @@ -636,7 +636,6 @@ pub async fn enqueue_all( .await { Ok(_) => { - println!("tags {:?}", overrides.tags); if !overrides.tags.is_empty() { let inserted_rows = Entity::find() .filter(Column::Url.is_in(urls)) @@ -747,7 +746,7 @@ pub async fn remove_by_rule(db: &DatabaseConnection, rule: &str) -> anyhow::Resu .map(|x| x.id) .collect(); - let rows_affected = delete_many_by_id(&db, &dbids).await?; + let rows_affected = delete_many_by_id(db, &dbids).await?; Ok(rows_affected) } diff --git a/crates/entities/src/models/indexed_document.rs b/crates/entities/src/models/indexed_document.rs index 0e2b9c07c..31bda393b 100644 --- a/crates/entities/src/models/indexed_document.rs +++ b/crates/entities/src/models/indexed_document.rs @@ -173,6 +173,36 @@ pub async fn insert_tags_many( .await } +pub async fn insert_tags_for_document( + doc: &Model, + db: &C, + tags: &[u64], +) -> Result, DbErr> { + let doc_tags = tags + .iter() + .map(|tag_id| document_tag::ActiveModel { + indexed_document_id: Set(doc.id), + tag_id: Set(*tag_id as i64), + created_at: Set(chrono::Utc::now()), + updated_at: Set(chrono::Utc::now()), + ..Default::default() + }) + .collect::>(); + + // Insert connections, ignoring duplicates + document_tag::Entity::insert_many(doc_tags) + .on_conflict( + sea_orm::sea_query::OnConflict::columns(vec![ + document_tag::Column::IndexedDocumentId, + document_tag::Column::TagId, + ]) + .do_nothing() + .to_owned(), + ) + .exec(db) + .await +} + /// Remove documents from the indexed_document table that match `rule`. Rule is expected /// to be a SQL like statement. pub async fn remove_by_rule(db: &DatabaseConnection, rule: &str) -> anyhow::Result> { diff --git a/crates/entities/src/models/processed_files.rs b/crates/entities/src/models/processed_files.rs index f772e7d1d..0f61d09d5 100644 --- a/crates/entities/src/models/processed_files.rs +++ b/crates/entities/src/models/processed_files.rs @@ -1,7 +1,6 @@ use sea_orm::entity::prelude::*; use sea_orm::Set; -use serde::{Deserialize, Serialize}; -use url::Url; +use serde::Serialize; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Eq)] #[sea_orm(table_name = "processed_files")] @@ -36,7 +35,7 @@ impl ActiveModelBehavior for ActiveModel { } // Triggered before insert / update - fn before_save(mut self, insert: bool) -> Result { + fn before_save(self, _insert: bool) -> Result { Ok(self) } } @@ -70,6 +69,6 @@ pub async fn remove_unmatched_paths( } Ok(items) } - Err(error) => return Err(anyhow::Error::from(error)), + Err(error) => Err(anyhow::Error::from(error)), } } diff --git a/crates/entities/src/models/tag.rs b/crates/entities/src/models/tag.rs index 80faa12cc..6cf2836a3 100644 --- a/crates/entities/src/models/tag.rs +++ b/crates/entities/src/models/tag.rs @@ -1,5 +1,5 @@ -use sea_orm::Set; use sea_orm::{entity::prelude::*, ConnectionTrait}; +use sea_orm::{Condition, Set}; use serde::{Deserialize, Serialize}; use strum_macros::{AsRefStr, EnumString}; @@ -154,6 +154,46 @@ where } } +pub async fn get_or_create_many(db: &C, tags: &Vec) -> Result, DbErr> +where + C: ConnectionTrait, +{ + let tag_models = tags + .iter() + .map(|(label, value)| ActiveModel { + label: Set(label.clone()), + value: Set(value.to_string()), + created_at: Set(chrono::Utc::now()), + updated_at: Set(chrono::Utc::now()), + ..Default::default() + }) + .collect::>(); + + let _ = Entity::insert_many(tag_models) + .on_conflict( + sea_orm::sea_query::OnConflict::columns(vec![Column::Label, Column::Value]) + .do_nothing() + .to_owned(), + ) + .exec_with_returning(db) + .await; + + let mut condition = Condition::any(); + for (label, value) in tags { + condition = condition.add( + Condition::all() + .add(Column::Label.eq(label.clone())) + .add(Column::Value.eq(value.clone())), + ); + } + let db_tags = Entity::find().filter(condition).all(db).await; + + match db_tags { + Ok(models) => Ok(models), + Err(err) => Err(err), + } +} + #[cfg(test)] mod test { use crate::models::tag; diff --git a/crates/migrations/src/m20230126_000001_create_file_table.rs b/crates/migrations/src/m20230126_000001_create_file_table.rs index 80278a878..cd2de020c 100644 --- a/crates/migrations/src/m20230126_000001_create_file_table.rs +++ b/crates/migrations/src/m20230126_000001_create_file_table.rs @@ -19,16 +19,13 @@ impl MigrationTrait for Migration { "created_at" text NOT NULL, "last_modified" text NOT NULL);"#; - for sql in &[processed_files] { - manager - .get_connection() - .execute(Statement::from_string( - manager.get_database_backend(), - sql.to_owned().to_string(), - )) - .await?; - } - + manager + .get_connection() + .execute(Statement::from_string( + manager.get_database_backend(), + processed_files.to_owned().to_string(), + )) + .await?; Ok(()) } diff --git a/crates/spyglass/src/crawler/mod.rs b/crates/spyglass/src/crawler/mod.rs index 32024b64e..270794dd2 100644 --- a/crates/spyglass/src/crawler/mod.rs +++ b/crates/spyglass/src/crawler/mod.rs @@ -5,9 +5,7 @@ use addr::parse_domain_name; use anyhow::Result; use chrono::prelude::*; use chrono::Duration; -use entities::models::crawl_queue::EnqueueSettings; use entities::models::tag::TagPair; -use entities::models::tag::TagType; use percent_encoding::percent_decode_str; use sha2::{Digest, Sha256}; use thiserror::Error; @@ -467,34 +465,6 @@ impl Crawler { } } -fn _build_file_tags(path: &Path) -> Vec { - let mut tags = Vec::new(); - tags.push((TagType::Lens, String::from("filesystem"))); - if path.is_dir() { - tags.push((TagType::Type, String::from("directory"))); - } else if path.is_file() { - tags.push((TagType::Type, String::from("file"))); - let ext = path - .extension() - .and_then(|x| x.to_str()) - .map(|x| x.to_string()); - if let Some(ext) = ext { - tags.push((TagType::FileExt, ext)); - } - } - - if path.is_symlink() { - tags.push((TagType::Type, String::from("symlink"))) - } - - let guess = new_mime_guess::from_path(path.clone()); - for mime_guess in guess.iter() { - tags.push((TagType::MimeType, mime_guess.to_string())); - } - - tags -} - fn _process_file(path: &Path, file_name: String, url: &Url) -> Result { // Attempt to read file let contents = match path.extension() { @@ -527,7 +497,7 @@ fn _process_file(path: &Path, file_name: String, url: &Url) -> Result Result Result { let extension = filesystem::utils::get_supported_file_extensions(state); - if path.is_file() { - if _matches_ext(path, &extension) { - log::error!("Is a file {:?}", path); - _process_file(path, file_name, url) - } else { - let tags = _build_file_tags(path); - let mut hasher = Sha256::new(); - hasher.update(file_name.as_bytes()); - let content_hash = Some(hex::encode(&hasher.finalize()[..])); - Ok(CrawlResult { - content_hash, - content: Some(file_name.clone()), - // Does a file have a description? Pull the first part of the file - description: Some(file_name.clone()), - title: Some(url.to_string()), - url: url.to_string(), - open_url: Some(url.to_string()), - links: Default::default(), - tags: tags, - ..Default::default() - }) - } - } else if path.is_dir() { - let tags = _build_file_tags(path); - let mut hasher = Sha256::new(); - hasher.update(file_name.as_bytes()); - let content_hash = Some(hex::encode(&hasher.finalize()[..])); - Ok(CrawlResult { - content_hash, - content: Some(file_name.clone()), - // Does a file have a description? Pull the first part of the file - description: Some(file_name.clone()), - title: Some(url.to_string()), - url: url.to_string(), - open_url: Some(url.to_string()), - links: Default::default(), - tags: tags, - ..Default::default() - }) - } else { - return Err(CrawlError::NotFound); + if path.is_file() && _matches_ext(path, &extension) { + return _process_file(path, file_name, url); } + Err(CrawlError::NotFound) } fn _matches_ext(path: &Path, extension: &HashSet) -> bool { diff --git a/crates/spyglass/src/documents/mod.rs b/crates/spyglass/src/documents/mod.rs index ef7c10e7b..6ac5aaf62 100644 --- a/crates/spyglass/src/documents/mod.rs +++ b/crates/spyglass/src/documents/mod.rs @@ -1,9 +1,19 @@ -use std::collections::HashMap; +use entities::{ + models::{crawl_queue, indexed_document, tag}, + sea_orm::DatabaseConnection, +}; +use std::{collections::HashMap, time::Instant}; -use entities::models::{crawl_queue, indexed_document}; -use entities::sea_orm::entity::prelude::*; +use libnetrunner::parser::ParseResult; +use url::Url; -use crate::{search::Searcher, state::AppState}; +use crate::{ + crawler::CrawlResult, + search::{DocumentUpdate, Searcher}, + state::AppState, +}; +use entities::models::tag::TagType; +use entities::sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set, TransactionTrait}; /// Helper method to delete indexed documents, crawl queue items and search /// documents by url @@ -44,3 +54,391 @@ pub async fn delete_documents_by_uri(state: &AppState, uri: Vec) { log::error!("Error deleting for indexed document store {:?}", error); } } + +// Process a list of crawl results. The following steps will be taken: +// 1. Find all urls that already have been processed in the database +// 2. Remove any documents that already exist from the index +// 3. Add all new results to the index +// 4. Insert all new documents to the indexed document database +pub async fn process_crawl_results(state: &AppState, lens: &str, results: &mut Vec) { + let now = Instant::now(); + // get a list of all urls + let parsed_urls = results + .iter() + .map(|val| val.url.clone()) + .collect::>(); + + // find all documents that already exist with that url + let existing: Vec = indexed_document::Entity::find() + .filter(indexed_document::Column::Url.is_in(parsed_urls)) + .all(&state.db) + .await + .unwrap_or_default(); + + // build a hash map of Url to the doc id + let mut id_map = HashMap::new(); + for model in &existing { + let _ = id_map.insert(model.url.to_string(), model.doc_id.clone()); + } + + // build a list of doc ids to delete from the index + let doc_id_list = id_map + .values() + .into_iter() + .map(|x| x.to_owned()) + .collect::>(); + + let _ = Searcher::delete_many_by_id(state, &doc_id_list, false).await; + let _ = Searcher::save(state).await; + + // Access tag for this lens and build id list + let tag = tag::get_or_create(&state.db, TagType::Lens, lens).await; + let lens_tag = match tag { + Ok(model) => Some(model.id as u64), + Err(error) => { + log::error!("Error accessing tag for lens {:?}", error); + None + } + }; + + let mut tag_map: HashMap> = HashMap::new(); + let transaction_rslt = state.db.begin().await; + match transaction_rslt { + Ok(transaction) => { + let mut updates = Vec::new(); + let mut added_docs = Vec::new(); + let mut tag_cache = HashMap::new(); + for crawl_result in results { + let tags_option = _get_tags( + &state.db, + crawl_result, + &lens_tag, + &mut tag_map, + &mut tag_cache, + ) + .await; + + let canonical_url_str = crawl_result.url.clone(); + + let url_rslt = Url::parse(canonical_url_str.as_str()); + match url_rslt { + Ok(url) => { + let url_host = url.host_str().unwrap_or(""); + // Add document to index + let doc_id: Option = { + if let Ok(mut index_writer) = state.index.writer.lock() { + match Searcher::upsert_document( + &mut index_writer, + DocumentUpdate { + doc_id: id_map.get(&canonical_url_str).cloned(), + title: &crawl_result.title.clone().unwrap_or_default(), + description: &crawl_result + .description + .clone() + .unwrap_or_default(), + domain: url_host, + url: url.as_str(), + content: &crawl_result.content.clone().unwrap_or_default(), + tags: &tags_option, + }, + ) { + Ok(new_doc_id) => Some(new_doc_id), + _ => None, + } + } else { + None + } + }; + + if let Some(new_id) = doc_id { + if !id_map.contains_key(&new_id) { + added_docs.push(url.to_string()); + let update = indexed_document::ActiveModel { + domain: Set(url_host.to_string()), + url: Set(url.to_string()), + open_url: Set(Some(url.to_string())), + doc_id: Set(new_id), + ..Default::default() + }; + + updates.push(update); + } + } + } + Err(error) => log::error!( + "Error processing url: {:?} error: {:?}", + canonical_url_str, + error + ), + } + } + + let doc_insert = indexed_document::Entity::insert_many(updates) + .on_conflict( + entities::sea_orm::sea_query::OnConflict::columns(vec![ + indexed_document::Column::Url, + ]) + .do_nothing() + .to_owned(), + ) + .exec(&transaction) + .await; + + if let Err(error) = doc_insert { + log::error!("Docs failed to insert {:?}", error); + } + + let commit = transaction.commit().await; + match commit { + Ok(_) => { + if let Ok(mut writer) = state.index.writer.lock() { + let _ = writer.commit(); + } + + let added_entries: Vec = + indexed_document::Entity::find() + .filter(indexed_document::Column::Url.is_in(added_docs)) + .all(&state.db) + .await + .unwrap_or_default(); + + if !added_entries.is_empty() { + for added in added_entries { + if let Some(tag_ids) = tag_map.get(&added.url) { + let result = indexed_document::insert_tags_for_document( + &added, &state.db, tag_ids, + ) + .await; + if let Err(error) = result { + log::error!("Error inserting tags {:?}", error); + } + } + } + } + } + Err(error) => { + log::error!("Failed to commit transaction {:?}", error); + } + } + } + Err(err) => log::error!("Transaction failed {:?}", err), + } + + log::debug!( + "Took {:?} to process crawl results", + now.elapsed().as_millis() + ); +} + +// Process a list of crawl results. The following steps will be taken: +// 1. Find all urls that already have been processed in the database +// 2. Remove any documents that already exist from the index +// 3. Add all new results to the index +// 4. Insert all new documents to the indexed document database +pub async fn process_records(state: &AppState, lens: &str, results: &mut Vec) { + // get a list of all urls + let parsed_urls = results + .iter() + .map(|val| val.canonical_url.clone().unwrap_or_default()) + .collect::>(); + + // find all documents that already exist with that url + let existing: Vec = indexed_document::Entity::find() + .filter(indexed_document::Column::Url.is_in(parsed_urls)) + .all(&state.db) + .await + .unwrap_or_default(); + + // build a hash map of Url to the doc id + let mut id_map = HashMap::new(); + for model in &existing { + let _ = id_map.insert(model.url.to_string(), model.doc_id.clone()); + } + + // build a list of doc ids to delete from the index + let doc_id_list = id_map + .values() + .into_iter() + .map(|x| x.to_owned()) + .collect::>(); + + let _ = Searcher::delete_many_by_id(state, &doc_id_list, false).await; + let _ = Searcher::save(state).await; + + // Access tag for this lens and build id list + let tag = tag::get_or_create(&state.db, TagType::Lens, lens).await; + let tag_list = match tag { + Ok(model) => Some(vec![model.id as u64]), + Err(error) => { + log::error!("Error accessing tag for lens {:?}", error); + None + } + }; + + let transaction_rslt = state.db.begin().await; + match transaction_rslt { + Ok(transaction) => { + let mut updates = Vec::new(); + let mut added_docs = Vec::new(); + for crawl_result in results { + let canonical_url = crawl_result.canonical_url.clone(); + match canonical_url { + Some(canonical_url_str) => { + let url_rslt = Url::parse(canonical_url_str.as_str()); + match url_rslt { + Ok(url) => { + let url_host = url.host_str().unwrap_or(""); + // Add document to index + let doc_id: Option = { + if let Ok(mut index_writer) = state.index.writer.lock() { + match Searcher::upsert_document( + &mut index_writer, + DocumentUpdate { + doc_id: id_map.get(&canonical_url_str).cloned(), + title: &crawl_result + .title + .clone() + .unwrap_or_default(), + description: &crawl_result.description.clone(), + domain: url_host, + url: url.as_str(), + content: &crawl_result.content, + tags: &tag_list, + }, + ) { + Ok(new_doc_id) => Some(new_doc_id), + _ => None, + } + } else { + None + } + }; + + if let Some(new_id) = doc_id { + if !id_map.contains_key(&new_id) { + added_docs.push(url.to_string()); + let update = indexed_document::ActiveModel { + domain: Set(url_host.to_string()), + url: Set(url.to_string()), + open_url: Set(Some(url.to_string())), + doc_id: Set(new_id), + ..Default::default() + }; + + updates.push(update); + } + } + } + Err(error) => log::error!( + "Error processing url: {:?} error: {:?}", + canonical_url_str, + error + ), + } + } + None => log::error!( + "None url is not value for content {:?}", + crawl_result.content.truncate(80) + ), + } + } + + let doc_insert = indexed_document::Entity::insert_many(updates) + .on_conflict( + entities::sea_orm::sea_query::OnConflict::columns(vec![ + indexed_document::Column::Url, + ]) + .do_nothing() + .to_owned(), + ) + .exec(&transaction) + .await; + + if let Err(error) = doc_insert { + log::error!("Docs failed to insert {:?}", error); + } + + let commit = transaction.commit().await; + match commit { + Ok(_) => { + if let Ok(mut writer) = state.index.writer.lock() { + let _ = writer.commit(); + } + + let added_entries: Vec = + indexed_document::Entity::find() + .filter(indexed_document::Column::Url.is_in(added_docs)) + .all(&state.db) + .await + .unwrap_or_default(); + + if !added_entries.is_empty() { + let result = indexed_document::insert_tags_many( + &added_entries, + &state.db, + &[(TagType::Lens, lens.to_string())], + ) + .await; + if let Err(error) = result { + log::error!("Error inserting tags {:?}", error); + } + } + } + Err(error) => { + log::error!("Failed to commit transaction {:?}", error); + } + } + } + Err(err) => log::error!("Transaction failed {:?}", err), + } +} + +/// Helper method used to get the tag ids for a specific crawl result. The tag map and the tag cache +/// will be modified as results are processed. The tag map contains the url to tag it mapping used +/// for insertion to the database. The tag_cache is used to avoid additional loops for common tags +/// that have already been processed. +async fn _get_tags( + db: &DatabaseConnection, + result: &CrawlResult, + lens_tag: &Option, + tag_map: &mut HashMap>, + tag_cache: &mut HashMap, +) -> Option> { + if !result.tags.is_empty() { + let mut tags = Vec::new(); + let mut to_search = Vec::new(); + + for (tag_type, value) in &result.tags { + let uid = format!("{}:{}", tag_type, value); + match tag_cache.get(&uid) { + Some(tag) => { + tags.push(*tag); + } + None => { + to_search.push((tag_type.clone(), value.clone())); + } + } + } + + if !to_search.is_empty() { + match tag::get_or_create_many(db, &to_search).await { + Ok(tag_models) => { + for tag in tag_models { + let tag_id = tag.id as u64; + tags.push(tag_id); + tag_cache.insert(format!("{}:{}", tag.label, tag.value), tag_id); + } + } + Err(error) => { + log::error!("Error accessing or creating tags {:?}", error); + } + } + } + + if let Some(lens_tag) = lens_tag { + tags.push(*lens_tag); + } + tag_map.insert(result.url.clone(), tags.clone()); + } + None +} diff --git a/crates/spyglass/src/filesystem/mod.rs b/crates/spyglass/src/filesystem/mod.rs index 84c69a07f..e18c23253 100644 --- a/crates/spyglass/src/filesystem/mod.rs +++ b/crates/spyglass/src/filesystem/mod.rs @@ -1,14 +1,19 @@ use dashmap::DashMap; use entities::models::crawl_queue::{self, CrawlType, EnqueueSettings}; -use entities::models::lens; -use entities::models::processed_files; -use entities::models::tag::TagType; +use entities::models::tag::{TagPair, TagType}; +use entities::models::{lens, processed_files}; use entities::sea_orm::entity::prelude::*; use entities::sea_orm::DatabaseConnection; use ignore::gitignore::Gitignore; use ignore::WalkBuilder; +use sha2::{Digest, Sha256}; +use url::Url; + +use crate::crawler::CrawlResult; +use crate::state::AppState; use entities::sea_orm::Set; +use entities::sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use migration::OnConflict; use std::sync::Arc; @@ -26,7 +31,6 @@ use uuid::Uuid; use notify_debouncer_mini::{DebouncedEvent, DebouncedEventKind, Debouncer}; use crate::documents; -use crate::state::AppState; pub mod utils; @@ -64,7 +68,7 @@ impl WatchPath { WatchPath { path: path.to_path_buf(), - _uuid: uuid.clone(), + _uuid: uuid, extensions, tx_channel: None::>>, } @@ -253,11 +257,11 @@ impl SpyglassFileWatcher { for event in events { if event.path.exists() { let mut model = processed_files::ActiveModel::new(); - model.file_path = Set(utils::path_buf_to_uri(&event.path)); + model.file_path = Set(utils::path_to_uri(&event.path)); model.last_modified = Set(utils::last_modified_time(&event.path)); inserts.push(model); } else { - removals.push(utils::path_buf_to_uri(&event.path)); + removals.push(utils::path_to_uri(&event.path)); } } @@ -286,14 +290,9 @@ impl SpyglassFileWatcher { } } - async fn remove_path(&mut self, path: &Path) { - if let Some((key, watchers)) = self.path_map.remove(&path.to_path_buf()) { + async fn _remove_path(&mut self, path: &Path) { + if let Some((_key, _watchers)) = self.path_map.remove(&path.to_path_buf()) { let _ = self.watcher.lock().await.watcher().unwatch(path); - for path in watchers { - if let Some(sender) = &path.tx_channel { - drop(sender); - } - } } } @@ -309,9 +308,6 @@ impl SpyglassFileWatcher { .await .watcher() .unwatch(path.path.as_path()); - if let Some(sender) = &path.tx_channel { - drop(sender); - } } } self.path_map.clear(); @@ -339,7 +335,7 @@ impl SpyglassFileWatcher { /// filters the provided events and returns the list of events that should not /// be ignored - fn filter_events(&self, events: &Vec) -> Vec { + fn filter_events(&self, events: &[DebouncedEvent]) -> Vec { events .iter() .filter_map(|evt| { @@ -391,10 +387,7 @@ impl SpyglassFileWatcher { let path_buf = path.to_path_buf(); let new_path = !self.path_map.contains_key(&path_buf); - self.path_map - .entry(path_buf) - .or_insert(Vec::new()) - .push(watch_path); + self.path_map.entry(path_buf).or_default().push(watch_path); let mode = if recursive { notify::RecursiveMode::Recursive @@ -491,13 +484,13 @@ impl SpyglassFileWatcher { .iter() .map(|path_ref| { debounced_events.push(DebouncedEvent { - path: utils::uri_to_path(&path_ref.key()).unwrap(), + path: utils::uri_to_path(path_ref.key()).unwrap(), kind: DebouncedEventKind::Any, }); let mut active_model = processed_files::ActiveModel::new(); active_model.file_path = Set(path_ref.key().clone()); - active_model.last_modified = Set(path_ref.value().clone()); + active_model.last_modified = Set(*path_ref.value()); active_model }) @@ -517,7 +510,7 @@ impl SpyglassFileWatcher { .map(|(uri, last_modified)| { let mut active_model = processed_files::ActiveModel::new(); active_model.file_path = Set(uri.clone()); - active_model.last_modified = Set(last_modified.clone()); + active_model.last_modified = Set(*last_modified); active_model }) @@ -552,18 +545,17 @@ pub async fn configure_watcher(state: AppState) { let paths = utils::get_search_directories(&state); let path_names = paths .iter() - .map(|path| utils::path_buf_to_uri(path)) + .map(|path| utils::path_to_uri(path)) .collect::>(); let mut watcher = state.file_watcher.lock().await; if let Some(watcher) = watcher.as_mut() { for path in paths { if !watcher.is_path_initialized(path.as_path()) { - - log::debug!("Adding {:?} to watch list", path); + log::debug!("Adding {:?} to watch list", path); let updates = watcher.initialize_path(path.as_path()).await; let rx1 = watcher.watch_path(path.as_path(), None, true).await; - + tokio::spawn(_process_messages( state.clone(), rx1, @@ -657,6 +649,7 @@ async fn _process_file_and_dir( extensions: &HashSet, ) -> anyhow::Result<()> { let mut enqueue_list = Vec::new(); + let mut general_processing = Vec::new(); let mut delete_list = Vec::new(); for event in events { let path = event.path; @@ -682,7 +675,16 @@ async fn _process_file_and_dir( } } - enqueue_list.push(uri); + let ext = &path + .extension() + .and_then(|x| x.to_str()) + .map(|x| x.to_string()) + .unwrap_or_default(); + if extensions.contains(ext) { + enqueue_list.push(uri); + } else { + general_processing.push(uri); + } } else { delete_list.push(uri); } @@ -693,7 +695,7 @@ async fn _process_file_and_dir( let enqueue_settings = EnqueueSettings { crawl_type: CrawlType::Normal, is_recrawl: true, - tags: tags, + tags, force_allow: true, }; if let Err(error) = @@ -704,9 +706,89 @@ async fn _process_file_and_dir( } } + if !general_processing.is_empty() { + log::debug!("Adding {} general documents", general_processing.len()); + for general_chunk in general_processing.chunks(500) { + _process_general_file(state, general_chunk).await; + } + } + if !delete_list.is_empty() { - documents::delete_documents_by_uri(&state, delete_list).await; + documents::delete_documents_by_uri(state, delete_list).await; } Ok(()) } + +/// Generates the tags for a file +pub fn build_file_tags(path: &Path) -> Vec { + let mut tags = Vec::new(); + tags.push((TagType::Lens, String::from("files"))); + if path.is_dir() { + tags.push((TagType::Type, String::from("directory"))); + } else if path.is_file() { + tags.push((TagType::Type, String::from("file"))); + let ext = path + .extension() + .and_then(|x| x.to_str()) + .map(|x| x.to_string()); + if let Some(ext) = ext { + tags.push((TagType::FileExt, ext)); + } + } + + if path.is_symlink() { + tags.push((TagType::Type, String::from("symlink"))) + } + + let guess = new_mime_guess::from_path(path); + for mime_guess in guess.iter() { + tags.push((TagType::MimeType, mime_guess.to_string())); + } + + tags +} + +// Helper method used process files +async fn _process_general_file(state: &AppState, file_uri: &[String]) { + let mut crawl_results = file_uri + .iter() + .filter_map(|uri| match Url::parse(uri) { + Ok(url) => match url.to_file_path() { + Ok(path) => _path_to_result(&url, &path), + Err(_) => None, + }, + Err(_) => None, + }) + .collect::>(); + + documents::process_crawl_results(state, "files", &mut crawl_results).await; +} + +// Process a path to parse result +fn _path_to_result(url: &Url, path: &Path) -> Option { + let file_name = path + .file_name() + .and_then(|x| x.to_str()) + .map(|x| x.to_string()) + .expect("Unable to convert path file name to string"); + let mut hasher = Sha256::new(); + hasher.update(file_name.as_bytes()); + let content_hash = hex::encode(&hasher.finalize()[..]); + let tags = build_file_tags(path); + if path.is_file() || path.is_dir() { + Some(CrawlResult { + content_hash: Some(content_hash), + content: Some(file_name.clone()), + // Does a file have a description? Pull the first part of the file + description: Some(file_name.clone()), + title: Some(url.to_string()), + url: url.to_string(), + open_url: Some(url.to_string()), + links: Default::default(), + tags, + }) + } else { + None + } +} diff --git a/crates/spyglass/src/filesystem/utils.rs b/crates/spyglass/src/filesystem/utils.rs index 14efa6e9f..077ffab55 100644 --- a/crates/spyglass/src/filesystem/utils.rs +++ b/crates/spyglass/src/filesystem/utils.rs @@ -16,10 +16,10 @@ pub fn path_to_uri(path: &Path) -> String { path_string_to_uri(path.display().to_string()) } -// Create a file URI -pub fn path_buf_to_uri(path: &PathBuf) -> String { - path_string_to_uri(path.display().to_string()) -} +// // Create a file URI +// pub fn path_buf_to_uri(path: &PathBuf) -> String { +// path_string_to_uri(path.display().to_string()) +// } pub fn path_string_to_uri(path_str: String) -> String { // Eventually this will be away to keep track of multiple devices and searching across @@ -39,7 +39,7 @@ pub fn path_string_to_uri(path_str: String) -> String { } /// Converts a uri to a valid path buf -pub fn uri_to_path(uri: &String) -> anyhow::Result { +pub fn uri_to_path(uri: &str) -> anyhow::Result { match Url::parse(uri) { Ok(url) => match url.to_file_path() { Ok(path) => Ok(path), @@ -89,7 +89,7 @@ pub fn last_modified_time_for_path(path: &Path) -> DateTime { if let Some(time) = NaiveDateTime::from_timestamp_millis(since_the_epoch.as_millis() as i64) { - return DateTime::::from_utc(time, Utc); + DateTime::::from_utc(time, Utc) } else { Utc::now() } @@ -103,7 +103,7 @@ pub fn last_modified_time_for_path(path: &Path) -> DateTime { /// Accessor for the last modified time for a file. If the last modified /// time is not available now is returned -pub fn last_modified_time(path: &PathBuf) -> DateTime { +pub fn last_modified_time(path: &Path) -> DateTime { if let Ok(metadata) = path.metadata() { if let Ok(modified) = metadata.modified() { let since_the_epoch = modified @@ -113,7 +113,7 @@ pub fn last_modified_time(path: &PathBuf) -> DateTime { if let Some(time) = NaiveDateTime::from_timestamp_millis(since_the_epoch.as_millis() as i64) { - return DateTime::::from_utc(time, Utc); + DateTime::::from_utc(time, Utc) } else { Utc::now() } @@ -141,7 +141,7 @@ pub fn get_search_directories(state: &AppState) -> Vec { directories .iter() - .map(|str| PathBuf::from(str)) + .map(PathBuf::from) .collect::>() } else { Vec::new() @@ -167,8 +167,7 @@ pub fn is_ignore_file(path: &Path) -> bool { if let Some(file_name) = path.file_name() { return file_name.eq(OsStr::new(".gitignore")); } - - return false; + false } /// Helper method used to convert a gitignore file into a processed gitignore object @@ -183,20 +182,18 @@ pub fn patterns_from_file(path: &Path) -> Result { /// .git /// .ssh pub fn is_in_hidden_dir(path: &Path) -> bool { - path.ancestors() - .find(|ancestor| { - if ancestor.is_dir() { - if let Some(name) = ancestor.file_name() { - if let Some(name_str) = name.to_str() { - if name_str.starts_with(".") { - return true; - } + path.ancestors().any(|ancestor| { + if ancestor.is_dir() { + if let Some(name) = ancestor.file_name() { + if let Some(name_str) = name.to_str() { + if name_str.starts_with('.') { + return true; } } } - false - }) - .is_some() + } + false + }) } #[cfg(test)] diff --git a/crates/spyglass/src/pipeline/cache_pipeline.rs b/crates/spyglass/src/pipeline/cache_pipeline.rs index 17aa92fbb..e9e458f48 100644 --- a/crates/spyglass/src/pipeline/cache_pipeline.rs +++ b/crates/spyglass/src/pipeline/cache_pipeline.rs @@ -1,22 +1,17 @@ -use std::collections::HashMap; use std::path::PathBuf; use std::time::Instant; use crate::crawler::{cache, CrawlResult}; +use crate::documents; use crate::pipeline::collector::CollectionResult; use crate::pipeline::PipelineContext; -use crate::search::{DocumentUpdate, Searcher}; use crate::state::AppState; -use entities::models::tag::TagType; -use entities::models::{indexed_document, tag}; use libnetrunner::parser::ParseResult; use tokio::task::JoinHandle; use super::parser::DefaultParser; use crate::crawler::archive; -use entities::sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set, TransactionTrait}; -use url::Url; /// processes the cache for a lens. The cache is streamed in from the provided path /// and processed. After the process is complete the cache is deleted @@ -91,13 +86,13 @@ pub async fn process_update(state: AppState, lens: String, cache_path: PathBuf) for record in record_iter.by_ref() { record_list.push(record); if record_list.len() >= 5000 { - process_records(&state, &lens, &mut record_list).await; + documents::process_records(&state, &lens, &mut record_list).await; record_list = Vec::new(); } } if !record_list.is_empty() { - process_records(&state, &lens, &mut record_list).await; + documents::process_records(&state, &lens, &mut record_list).await; } } @@ -105,166 +100,3 @@ pub async fn process_update(state: AppState, lens: String, cache_path: PathBuf) let _ = cache::delete_cache(&cache_path); log::debug!("Processing Cache Took: {:?}", now.elapsed().as_millis()) } - -// Process a list of crawl results. The following steps will be taken: -// 1. Find all urls that already have been processed in the database -// 2. Remove any documents that already exist from the index -// 3. Add all new results to the index -// 4. Insert all new documents to the indexed document database -async fn process_records(state: &AppState, lens: &str, results: &mut Vec) { - // get a list of all urls - let parsed_urls = results - .iter() - .map(|val| val.canonical_url.clone().unwrap_or_default()) - .collect::>(); - - // find all documents that already exist with that url - let existing: Vec = indexed_document::Entity::find() - .filter(indexed_document::Column::Url.is_in(parsed_urls)) - .all(&state.db) - .await - .unwrap_or_default(); - - // build a hash map of Url to the doc id - let mut id_map = HashMap::new(); - for model in &existing { - let _ = id_map.insert(model.url.to_string(), model.doc_id.clone()); - } - - // build a list of doc ids to delete from the index - let doc_id_list = id_map - .values() - .into_iter() - .map(|x| x.to_owned()) - .collect::>(); - - let _ = Searcher::delete_many_by_id(state, &doc_id_list, false).await; - let _ = Searcher::save(state).await; - - // Access tag for this lens and build id list - let tag = tag::get_or_create(&state.db, TagType::Lens, lens).await; - let tag_list = match tag { - Ok(model) => Some(vec![model.id as u64]), - Err(error) => { - log::error!("Error accessing tag for lens {:?}", error); - None - } - }; - - let transaction_rslt = state.db.begin().await; - match transaction_rslt { - Ok(transaction) => { - let mut updates = Vec::new(); - let mut added_docs = Vec::new(); - for crawl_result in results { - let canonical_url = crawl_result.canonical_url.clone(); - match canonical_url { - Some(canonical_url_str) => { - let url_rslt = Url::parse(canonical_url_str.as_str()); - match url_rslt { - Ok(url) => { - let url_host = url.host_str().unwrap_or(""); - // Add document to index - let doc_id: Option = { - if let Ok(mut index_writer) = state.index.writer.lock() { - match Searcher::upsert_document( - &mut index_writer, - DocumentUpdate { - doc_id: id_map.get(&canonical_url_str).cloned(), - title: &crawl_result - .title - .clone() - .unwrap_or_default(), - description: &crawl_result.description.clone(), - domain: url_host, - url: url.as_str(), - content: &crawl_result.content, - tags: &tag_list, - }, - ) { - Ok(new_doc_id) => Some(new_doc_id), - _ => None, - } - } else { - None - } - }; - - if let Some(new_id) = doc_id { - if !id_map.contains_key(&new_id) { - added_docs.push(url.to_string()); - let update = indexed_document::ActiveModel { - domain: Set(url_host.to_string()), - url: Set(url.to_string()), - open_url: Set(Some(url.to_string())), - doc_id: Set(new_id), - ..Default::default() - }; - - updates.push(update); - } - } - } - Err(error) => log::error!( - "Error processing url: {:?} error: {:?}", - canonical_url_str, - error - ), - } - } - None => log::error!( - "None url is not value for content {:?}", - crawl_result.content.truncate(80) - ), - } - } - - let doc_insert = indexed_document::Entity::insert_many(updates) - .on_conflict( - entities::sea_orm::sea_query::OnConflict::columns(vec![ - indexed_document::Column::Url, - ]) - .do_nothing() - .to_owned(), - ) - .exec(&transaction) - .await; - - if let Err(error) = doc_insert { - log::error!("Docs failed to insert {:?}", error); - } - - let commit = transaction.commit().await; - match commit { - Ok(_) => { - if let Ok(mut writer) = state.index.writer.lock() { - let _ = writer.commit(); - } - - let added_entries: Vec = - indexed_document::Entity::find() - .filter(indexed_document::Column::Url.is_in(added_docs)) - .all(&state.db) - .await - .unwrap_or_default(); - - if !added_entries.is_empty() { - let result = indexed_document::insert_tags_many( - &added_entries, - &state.db, - &[(TagType::Lens, lens.to_string())], - ) - .await; - if let Err(error) = result { - log::error!("Error inserting tags {:?}", error); - } - } - } - Err(error) => { - log::error!("Failed to commit transaction {:?}", error); - } - } - } - Err(err) => log::error!("Transaction failed {:?}", err), - } -} diff --git a/crates/spyglass/src/plugin/mod.rs b/crates/spyglass/src/plugin/mod.rs index 5b6017f44..d0e53dc93 100644 --- a/crates/spyglass/src/plugin/mod.rs +++ b/crates/spyglass/src/plugin/mod.rs @@ -237,7 +237,7 @@ pub async fn plugin_event_loop( } Some(PluginCommand::EnablePlugin(plugin_name)) => { log::info!("enabling plugin <{}>", plugin_name); - + let manager = state.plugin_manager.lock().await; if let Some(plugin) = manager.find_by_name(plugin_name.clone()) { if let Some(mut instance) = manager.plugins.get_mut(&plugin.id) { From 1d7ccbf501bce12940396429ed89b5287d219281 Mon Sep 17 00:00:00 2001 From: Joel Bredeson Date: Mon, 30 Jan 2023 15:45:32 -0800 Subject: [PATCH 5/9] Change type to use type supported on multiple platforms --- crates/spyglass/src/filesystem/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/spyglass/src/filesystem/mod.rs b/crates/spyglass/src/filesystem/mod.rs index e18c23253..1bfbaabca 100644 --- a/crates/spyglass/src/filesystem/mod.rs +++ b/crates/spyglass/src/filesystem/mod.rs @@ -23,7 +23,7 @@ use std::{ path::{Path, PathBuf}, }; -use notify::ReadDirectoryChangesWatcher; +use notify::RecommendedWatcher; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; use uuid::Uuid; @@ -40,7 +40,7 @@ pub mod utils; /// Any updates that make it through will be passed to listeners pub struct SpyglassFileWatcher { // The director watcher services - watcher: Arc>>, + watcher: Arc>>, // The map of path being watched to the list of watchers path_map: DashMap>, // Map of .gitignore file path to the ignore file processor From 807c18c6168805396235caa2cff70c062c7a10ff Mon Sep 17 00:00:00 2001 From: Joel Bredeson Date: Mon, 30 Jan 2023 16:27:55 -0800 Subject: [PATCH 6/9] Update unit test --- crates/entities/src/models/crawl_queue.rs | 2 +- crates/entities/src/models/sql/dequeue.sqlx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/entities/src/models/crawl_queue.rs b/crates/entities/src/models/crawl_queue.rs index 8679f0ee0..3975cad77 100644 --- a/crates/entities/src/models/crawl_queue.rs +++ b/crates/entities/src/models/crawl_queue.rs @@ -904,7 +904,7 @@ mod test { let sql = gen_dequeue_sql(settings); assert_eq!( sql.to_string(), - "WITH\nindexed AS (\n SELECT\n domain,\n count(*) as count\n FROM indexed_document\n GROUP BY domain\n),\ninflight AS (\n SELECT\n domain,\n count(*) as count\n FROM crawl_queue\n WHERE status = \"Processing\"\n GROUP BY domain\n)\nSELECT\n cq.*\nFROM crawl_queue cq\nLEFT JOIN indexed ON indexed.domain = cq.domain\nLEFT JOIN inflight ON inflight.domain = cq.domain\nWHERE\n COALESCE(indexed.count, 0) < 500000 AND\n COALESCE(inflight.count, 0) < 2 AND\n status = \"Queued\"\nORDER BY\n cq.updated_at ASC" + "WITH\nindexed AS (\n SELECT\n domain,\n count(*) as count\n FROM indexed_document\n GROUP BY domain\n),\ninflight AS (\n SELECT\n domain,\n count(*) as count\n FROM crawl_queue\n WHERE status = \"Processing\"\n GROUP BY domain\n)\nSELECT\n cq.*\nFROM crawl_queue cq\nLEFT JOIN indexed ON indexed.domain = cq.domain\nLEFT JOIN inflight ON inflight.domain = cq.domain\nWHERE\n COALESCE(indexed.count, 0) < 500000 AND\n COALESCE(inflight.count, 0) < 2 AND\n status = \"Queued\" and\n url not like \"file%\"\nORDER BY\n cq.updated_at ASC" ); } diff --git a/crates/entities/src/models/sql/dequeue.sqlx b/crates/entities/src/models/sql/dequeue.sqlx index f6c20b6ca..3d5fef744 100644 --- a/crates/entities/src/models/sql/dequeue.sqlx +++ b/crates/entities/src/models/sql/dequeue.sqlx @@ -22,7 +22,7 @@ LEFT JOIN inflight ON inflight.domain = cq.domain WHERE COALESCE(indexed.count, 0) < ? AND COALESCE(inflight.count, 0) < ? AND - status = "Queued" and + status = "Queued" and url not like "file%" ORDER BY cq.updated_at ASC \ No newline at end of file From afcbc0755d87b7846c488c0ff897fcccad681629 Mon Sep 17 00:00:00 2001 From: Joel Bredeson Date: Mon, 30 Jan 2023 16:43:53 -0800 Subject: [PATCH 7/9] Update crawler to not check file extensions --- crates/spyglass/src/crawler/mod.rs | 6 ++---- crates/spyglass/src/filesystem/utils.rs | 4 ++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/crates/spyglass/src/crawler/mod.rs b/crates/spyglass/src/crawler/mod.rs index 270794dd2..179c7f4ce 100644 --- a/crates/spyglass/src/crawler/mod.rs +++ b/crates/spyglass/src/crawler/mod.rs @@ -512,14 +512,12 @@ fn _process_file(path: &Path, file_name: String, url: &Url) -> Result Result { - let extension = filesystem::utils::get_supported_file_extensions(state); - - if path.is_file() && _matches_ext(path, &extension) { + if path.is_file() { return _process_file(path, file_name, url); } Err(CrawlError::NotFound) diff --git a/crates/spyglass/src/filesystem/utils.rs b/crates/spyglass/src/filesystem/utils.rs index 077ffab55..6b8c348ef 100644 --- a/crates/spyglass/src/filesystem/utils.rs +++ b/crates/spyglass/src/filesystem/utils.rs @@ -217,9 +217,9 @@ mod test { let uri = path_to_uri(&test_path.to_path_buf()); #[cfg(target_os = "windows")] - assert_eq!(uri, "file://localhost/C%3A/tmp/path_to_uri/test.txt"); + assert_eq!(uri, "file:///C%3A/tmp/path_to_uri/test.txt"); #[cfg(not(target_os = "windows"))] - assert_eq!(uri, "file://localhost/tmp/path_to_uri/test.txt"); + assert_eq!(uri, "file:///tmp/path_to_uri/test.txt"); let url = Url::parse(&uri).unwrap(); let file_path = url.to_file_path().unwrap(); From b751b633e397d83363607428b2924ec2f7dcfd79 Mon Sep 17 00:00:00 2001 From: Joel Bredeson Date: Mon, 30 Jan 2023 18:39:21 -0800 Subject: [PATCH 8/9] clippy update --- plugins/local-file-indexer/src/main.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/plugins/local-file-indexer/src/main.rs b/plugins/local-file-indexer/src/main.rs index 8dee2dff8..440b0b098 100644 --- a/plugins/local-file-indexer/src/main.rs +++ b/plugins/local-file-indexer/src/main.rs @@ -1,6 +1,6 @@ use chrono::prelude::*; use std::collections::{HashMap, HashSet}; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use serde::{Deserialize, Serialize}; use spyglass_plugin::utils::path_to_uri; @@ -9,13 +9,9 @@ use spyglass_plugin::*; #[derive(Default)] struct Plugin { extensions: HashSet, - last_synced: SyncData, + _last_synced: SyncData, } -const PLUGIN_DATA: &str = "/data.json"; -const FOLDERS_LIST_ENV: &str = "FOLDERS_LIST"; -const EXTS_LIST_ENV: &str = "EXTS_LIST"; - #[derive(Default, Deserialize, Serialize)] struct SyncData { path_to_times: HashMap>, From 729020cd591b2bcc019f840a0d64984ca9c3b1d4 Mon Sep 17 00:00:00 2001 From: Joel Bredeson Date: Mon, 30 Jan 2023 19:52:07 -0800 Subject: [PATCH 9/9] Additional clippy updates --- crates/client/src/components/lens.rs | 2 +- crates/client/src/pages/wizard/menubar_help.rs | 2 +- crates/entities/src/models/processed_files.rs | 2 +- crates/spyglass/src/documents/mod.rs | 2 +- crates/spyglass/src/filesystem/mod.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/client/src/components/lens.rs b/crates/client/src/components/lens.rs index 791b3f53b..e9707e0f9 100644 --- a/crates/client/src/components/lens.rs +++ b/crates/client/src/components/lens.rs @@ -85,7 +85,7 @@ pub fn lens_component(props: &LensProps) -> Html { }} {"Uninstall"} -
{format!("{} docs", buf)}
+
{format!("{buf} docs")}
} } diff --git a/crates/client/src/pages/wizard/menubar_help.rs b/crates/client/src/pages/wizard/menubar_help.rs index 8e6cd9406..0cd0bb5cc 100644 --- a/crates/client/src/pages/wizard/menubar_help.rs +++ b/crates/client/src/pages/wizard/menubar_help.rs @@ -19,7 +19,7 @@ pub fn menubar_help() -> Html { html! {
Location of the menubar menu -
{format!("Spyglass lives in your {}.", menubar_name)}
+
{format!("Spyglass lives in your {menubar_name}.")}
{format!("{click_str} on the icon to access your library, discover new lenses, and adjust your settings.")}
diff --git a/crates/entities/src/models/processed_files.rs b/crates/entities/src/models/processed_files.rs index 0f61d09d5..3846d179a 100644 --- a/crates/entities/src/models/processed_files.rs +++ b/crates/entities/src/models/processed_files.rs @@ -49,7 +49,7 @@ pub async fn remove_unmatched_paths( let mut find = Entity::find(); if !paths.is_empty() { for path in paths { - find = find.filter(Column::FilePath.not_like(format!("{}%", path).as_str())); + find = find.filter(Column::FilePath.not_like(format!("{path}%").as_str())); } } else { log::debug!("No paths being watched removing all."); diff --git a/crates/spyglass/src/documents/mod.rs b/crates/spyglass/src/documents/mod.rs index 6ac5aaf62..dc8d3393f 100644 --- a/crates/spyglass/src/documents/mod.rs +++ b/crates/spyglass/src/documents/mod.rs @@ -409,7 +409,7 @@ async fn _get_tags( let mut to_search = Vec::new(); for (tag_type, value) in &result.tags { - let uid = format!("{}:{}", tag_type, value); + let uid = format!("{tag_type}:{value}"); match tag_cache.get(&uid) { Some(tag) => { tags.push(*tag); diff --git a/crates/spyglass/src/filesystem/mod.rs b/crates/spyglass/src/filesystem/mod.rs index 1bfbaabca..fe71e909e 100644 --- a/crates/spyglass/src/filesystem/mod.rs +++ b/crates/spyglass/src/filesystem/mod.rs @@ -244,7 +244,7 @@ impl SpyglassFileWatcher { db: state.db.clone(), }; - let _ = tokio::spawn(watch_events(state.clone(), file_events)); + tokio::spawn(watch_events(state.clone(), file_events)); spy_watcher }