Skip to content

Commit

Permalink
Fix process shutdown (#3673)
Browse files Browse the repository at this point in the history
I noticed that stopping the Lemmy process with ctrl+c wasnt working
because the activity channel isnt properly closed. This is now fixed.

Later we should also move the channel from static into LemmyContext,
Im not doing that now to avoid conflicts with #3670.
  • Loading branch information
Nutomic authored Jul 20, 2023
1 parent 1a164a6 commit ccc1221
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
34 changes: 25 additions & 9 deletions crates/api_common/src/send_activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ use futures::future::BoxFuture;
use lemmy_db_schema::source::post::Post;
use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
use once_cell::sync::{Lazy, OnceCell};
use tokio::sync::{
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender},
Mutex,
use tokio::{
sync::{
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
Mutex,
},
task::JoinHandle,
};

type MatchOutgoingActivitiesBoxed =
Expand All @@ -21,17 +24,22 @@ pub enum SendActivityData {
CreatePost(Post),
}

// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
// ctrl+c still works.
static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
let (sender, receiver) = mpsc::unbounded_channel();
let weak_sender = sender.downgrade();
ActivityChannel {
sender,
weak_sender,
receiver: Mutex::new(receiver),
keepalive_sender: Mutex::new(Some(sender)),
}
});

pub struct ActivityChannel {
sender: UnboundedSender<SendActivityData>,
weak_sender: WeakUnboundedSender<SendActivityData>,
receiver: Mutex<UnboundedReceiver<SendActivityData>>,
keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
}

impl ActivityChannel {
Expand All @@ -49,10 +57,18 @@ impl ActivityChannel {
.get()
.expect("retrieve function pointer")(data, context)
.await?;
} else {
let lock = &ACTIVITY_CHANNEL.sender;
lock.send(data)?;
}
// could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
// not sure which way is more efficient
else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
sender.send(data)?;
}
Ok(())
}

pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
outgoing_activities_task.await??;
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use lemmy_api_common::{
context::LemmyContext,
lemmy_db_views::structs::SiteView,
request::build_user_agent,
send_activity::MATCH_OUTGOING_ACTIVITIES,
send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES},
utils::{
check_private_instance_and_federation_enabled,
local_site_rate_limit_to_rate_limit_config,
Expand Down Expand Up @@ -227,7 +227,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
.await?;

// Wait for outgoing apub sends to complete
outgoing_activities_task.await??;
ActivityChannel::close(outgoing_activities_task).await?;

Ok(())
}
Expand Down

0 comments on commit ccc1221

Please sign in to comment.