Skip to content

Commit

Permalink
Tweaks + caching
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 18, 2024
1 parent 06ebde5 commit 5e64ecc
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 44 deletions.
46 changes: 37 additions & 9 deletions src/http_server/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tower_http::trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer};
use tower_http::LatencyUnit;
use tower_http::{timeout::TimeoutLayer, trace::DefaultOnFailure};
use tracing::Level;
use tracing::{error, info};

pub fn create_router<T>(state: Arc<AppState<T>>) -> Result<Router>
where
Expand Down Expand Up @@ -52,12 +53,21 @@ where
T: RepoTrait,
{
let public_key = PublicKey::from_hex(&pubkey).map_err(|_| ApiError::InvalidPublicKey)?;
if let Some(cached_recommendation_result) = state.recommendation_cache.get(&pubkey).await {
return Ok(Json(cached_recommendation_result));
}

let recommendations = state
.repo
.get_recommendations(&public_key)
.await
.map_err(ApiError::from)?;

state
.recommendation_cache
.insert(pubkey, recommendations.clone())
.await;

Ok(Json(recommendations))
}

Expand All @@ -68,21 +78,38 @@ async fn maybe_spammer<T>(
where
T: RepoTrait,
{
let Ok(public_key) = PublicKey::from_hex(&pubkey).map_err(|_| ApiError::InvalidPublicKey)
else {
return Json(false);
let public_key = match PublicKey::from_hex(&pubkey).map_err(|_| ApiError::InvalidPublicKey) {
Ok(public_key) => public_key,
Err(e) => {
error!("Invalid public key: {}", e);
return Json(true);
}
};

let Ok(pagerank) = state
if let Some(cached_spammer_result) = state.spammer_cache.get(&pubkey).await {
return Json(cached_spammer_result);
}

match state
.repo
.get_pagerank(&public_key)
.await
.map_err(ApiError::from)
else {
return Json(false);
};

Json(pagerank > 0.5)
{
Ok(pagerank) => {
info!("Pagerank for {}: {}", public_key.to_hex(), pagerank);
let is_spammer = pagerank < 0.2;
state.spammer_cache.insert(pubkey, is_spammer).await;
Json(is_spammer)
// TODO don't return if it's too low, instead use other manual
// checks, nos user, nip05, check network, more hints, and only then
// give up
}
Err(e) => {
error!("Invalid public key: {}", e);
Json(true)
}
}
}

async fn serve_root_page(_headers: HeaderMap) -> impl IntoResponse {
Expand Down Expand Up @@ -122,6 +149,7 @@ impl IntoResponse for ApiError {
),
ApiError::AxumError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Axum error".to_string()),
};
error!("Api error: {}", body);
(status, body).into_response()
}
}
6 changes: 3 additions & 3 deletions src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl RepoTrait for Repo {
// Step 1: Get valid target nodes
MATCH (source:User {pubkey: $pubkey_val})
MATCH (target:User)
WHERE target.pagerank >= 0.1
WHERE target.pagerank >= 0.3
AND NOT EXISTS {
MATCH (source)-[:FOLLOWS]->(target)
}
Expand All @@ -472,8 +472,8 @@ impl RepoTrait for Repo {
CALL gds.nodeSimilarity.filtered.stream('filteredGraph', {
sourceNodeFilter: [id(source)],
targetNodeFilter: targetNodeIds,
topK: 10, // Top 10 similar users
similarityCutoff: 0.05 // Only include nodes with similarity >= 0.1
topK: 10,
similarityCutoff: 0.1
})
YIELD node1, node2, similarity
WITH gds.util.asNode(node2) AS targetUser, similarity
Expand Down
75 changes: 43 additions & 32 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub async fn start_scheduler<T>(
where
T: RepoTrait + 'static,
{
// For the moment ensure we always trigger one update on startup
refresh_pagerank(repo.clone()).await;

// And then once every day
let mut sched = JobScheduler::new().await?;
let cron_expression = settings.pagerank_cron_expression.as_str();
let repo_clone = Arc::clone(&repo);
Expand All @@ -28,38 +32,7 @@ where
Box::pin(async move {
info!("Starting scheduled PageRank update...");

let start_time = Instant::now();

let retry_strategy = ExponentialBackoff::from_millis(100)
.max_delay(Duration::from_secs(10))
.map(jitter);

let result = Retry::spawn(retry_strategy, || async {
if let Err(e) = repo_inner.update_memory_graph().await {
error!("Memory graph update failed: {:?}", e);
return Err(e);
}

if let Err(e) = repo_inner.update_pagerank().await {
error!("Failed to update PageRank: {:?}", e);
return Err(e);
}

Ok(())
})
.await;

let elapsed = start_time.elapsed();

match result {
Ok(_) => info!("PageRank updated successfully in {:?}", elapsed),
Err(e) => error!(
"Failed to update PageRank after retries in {:?}: {:?}",
elapsed, e
),
}

metrics::pagerank_seconds().set(elapsed.as_secs_f64());
refresh_pagerank(repo_inner).await;
})
})?;

Expand All @@ -79,3 +52,41 @@ where

Ok(())
}

async fn refresh_pagerank<T>(repo_inner: Arc<T>)
where
T: RepoTrait + 'static,
{
let start_time = Instant::now();

let retry_strategy = ExponentialBackoff::from_millis(100)
.max_delay(Duration::from_secs(10))
.map(jitter);

let result = Retry::spawn(retry_strategy, || async {
if let Err(e) = repo_inner.update_memory_graph().await {
error!("Memory graph update failed: {:?}", e);
return Err(e);
}

if let Err(e) = repo_inner.update_pagerank().await {
error!("Failed to update PageRank: {:?}", e);
return Err(e);
}

Ok(())
})
.await;

let elapsed = start_time.elapsed();

match result {
Ok(_) => info!("PageRank updated successfully in {:?}", elapsed),
Err(e) => error!(
"Failed to update PageRank after retries in {:?}: {:?}",
elapsed, e
),
}

metrics::pagerank_seconds().set(elapsed.as_secs_f64());
}

0 comments on commit 5e64ecc

Please sign in to comment.