From 5e64ecc89c3ec9182877309fb9334556aebe022d Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Wed, 18 Sep 2024 18:11:26 -0300 Subject: [PATCH] Tweaks + caching --- src/http_server/router.rs | 46 +++++++++++++++++++----- src/repo.rs | 6 ++-- src/scheduler.rs | 75 ++++++++++++++++++++++----------------- 3 files changed, 83 insertions(+), 44 deletions(-) diff --git a/src/http_server/router.rs b/src/http_server/router.rs index f7d005c..14ccf5c 100644 --- a/src/http_server/router.rs +++ b/src/http_server/router.rs @@ -15,6 +15,7 @@ use tower_http::trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}; use tower_http::LatencyUnit; use tower_http::{timeout::TimeoutLayer, trace::DefaultOnFailure}; use tracing::Level; +use tracing::{error, info}; pub fn create_router(state: Arc>) -> Result where @@ -52,12 +53,21 @@ where T: RepoTrait, { let public_key = PublicKey::from_hex(&pubkey).map_err(|_| ApiError::InvalidPublicKey)?; + if let Some(cached_recommendation_result) = state.recommendation_cache.get(&pubkey).await { + return Ok(Json(cached_recommendation_result)); + } + let recommendations = state .repo .get_recommendations(&public_key) .await .map_err(ApiError::from)?; + state + .recommendation_cache + .insert(pubkey, recommendations.clone()) + .await; + Ok(Json(recommendations)) } @@ -68,21 +78,38 @@ async fn maybe_spammer( where T: RepoTrait, { - let Ok(public_key) = PublicKey::from_hex(&pubkey).map_err(|_| ApiError::InvalidPublicKey) - else { - return Json(false); + let public_key = match PublicKey::from_hex(&pubkey).map_err(|_| ApiError::InvalidPublicKey) { + Ok(public_key) => public_key, + Err(e) => { + error!("Invalid public key: {}", e); + return Json(true); + } }; - let Ok(pagerank) = state + if let Some(cached_spammer_result) = state.spammer_cache.get(&pubkey).await { + return Json(cached_spammer_result); + } + + match state .repo .get_pagerank(&public_key) .await .map_err(ApiError::from) - else { - return Json(false); - }; - - Json(pagerank > 0.5) + { + Ok(pagerank) => { + info!("Pagerank for {}: {}", public_key.to_hex(), pagerank); + let is_spammer = pagerank < 0.2; + state.spammer_cache.insert(pubkey, is_spammer).await; + Json(is_spammer) + // TODO don't return if it's too low, instead use other manual + // checks, nos user, nip05, check network, more hints, and only then + // give up + } + Err(e) => { + error!("Invalid public key: {}", e); + Json(true) + } + } } async fn serve_root_page(_headers: HeaderMap) -> impl IntoResponse { @@ -122,6 +149,7 @@ impl IntoResponse for ApiError { ), ApiError::AxumError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Axum error".to_string()), }; + error!("Api error: {}", body); (status, body).into_response() } } diff --git a/src/repo.rs b/src/repo.rs index ab4acba..18e451d 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -462,7 +462,7 @@ impl RepoTrait for Repo { // Step 1: Get valid target nodes MATCH (source:User {pubkey: $pubkey_val}) MATCH (target:User) - WHERE target.pagerank >= 0.1 + WHERE target.pagerank >= 0.3 AND NOT EXISTS { MATCH (source)-[:FOLLOWS]->(target) } @@ -472,8 +472,8 @@ impl RepoTrait for Repo { CALL gds.nodeSimilarity.filtered.stream('filteredGraph', { sourceNodeFilter: [id(source)], targetNodeFilter: targetNodeIds, - topK: 10, // Top 10 similar users - similarityCutoff: 0.05 // Only include nodes with similarity >= 0.1 + topK: 10, + similarityCutoff: 0.1 }) YIELD node1, node2, similarity WITH gds.util.asNode(node2) AS targetUser, similarity diff --git a/src/scheduler.rs b/src/scheduler.rs index a6b0f0e..1b6dcaf 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -18,6 +18,10 @@ pub async fn start_scheduler( where T: RepoTrait + 'static, { + // For the moment ensure we always trigger one update on startup + refresh_pagerank(repo.clone()).await; + + // And then once every day let mut sched = JobScheduler::new().await?; let cron_expression = settings.pagerank_cron_expression.as_str(); let repo_clone = Arc::clone(&repo); @@ -28,38 +32,7 @@ where Box::pin(async move { info!("Starting scheduled PageRank update..."); - let start_time = Instant::now(); - - let retry_strategy = ExponentialBackoff::from_millis(100) - .max_delay(Duration::from_secs(10)) - .map(jitter); - - let result = Retry::spawn(retry_strategy, || async { - if let Err(e) = repo_inner.update_memory_graph().await { - error!("Memory graph update failed: {:?}", e); - return Err(e); - } - - if let Err(e) = repo_inner.update_pagerank().await { - error!("Failed to update PageRank: {:?}", e); - return Err(e); - } - - Ok(()) - }) - .await; - - let elapsed = start_time.elapsed(); - - match result { - Ok(_) => info!("PageRank updated successfully in {:?}", elapsed), - Err(e) => error!( - "Failed to update PageRank after retries in {:?}: {:?}", - elapsed, e - ), - } - - metrics::pagerank_seconds().set(elapsed.as_secs_f64()); + refresh_pagerank(repo_inner).await; }) })?; @@ -79,3 +52,41 @@ where Ok(()) } + +async fn refresh_pagerank(repo_inner: Arc) +where + T: RepoTrait + 'static, +{ + let start_time = Instant::now(); + + let retry_strategy = ExponentialBackoff::from_millis(100) + .max_delay(Duration::from_secs(10)) + .map(jitter); + + let result = Retry::spawn(retry_strategy, || async { + if let Err(e) = repo_inner.update_memory_graph().await { + error!("Memory graph update failed: {:?}", e); + return Err(e); + } + + if let Err(e) = repo_inner.update_pagerank().await { + error!("Failed to update PageRank: {:?}", e); + return Err(e); + } + + Ok(()) + }) + .await; + + let elapsed = start_time.elapsed(); + + match result { + Ok(_) => info!("PageRank updated successfully in {:?}", elapsed), + Err(e) => error!( + "Failed to update PageRank after retries in {:?}: {:?}", + elapsed, e + ), + } + + metrics::pagerank_seconds().set(elapsed.as_secs_f64()); +}