Skip to content

Commit

Permalink
Drop receiver in handle_replies
Browse files Browse the repository at this point in the history
When `Message::Done` was received before the cleanup loop executed it
led to a deadlock since the loop didn't see the receiving end being
disconnected and it also had strong reference via `Arc` preventing it
from dropping.

This change solves the problem by splitting the channel and only keeping
the sender in `Connection`. The reciver is passed to `handle_replies` as
an extra argument, making sure it will be used exactly once.

The main advantage of this method over trying to keep the receiver
inside `Connection` is that it hanles cases when the thread panicks and
possibly forgetting explicit close in future refactors. Another
advantage is being able to remove one `Arc::clone` and I have an idea
for removing locking on `senders` in the future too.

Closes #283
  • Loading branch information
Kixunil committed Jul 28, 2020
1 parent 3f750cf commit 4764dcc
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use serde_json::{from_str, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::mpsc::{self, Sender, SyncSender, Receiver, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;

use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use crate::query::{Query, Status};
use crate::util::{spawn_thread, Channel, HeaderEntry, SyncChannel};
use crate::util::{spawn_thread, Channel, HeaderEntry};

const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION");
const PROTOCOL_VERSION: &str = "1.4";
Expand Down Expand Up @@ -74,7 +74,7 @@ struct Connection {
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
chan: SyncChannel<Message>,
chan: SyncSender<Message>,
stats: Arc<Stats>,
relayfee: f64,
}
Expand All @@ -86,14 +86,15 @@ impl Connection {
addr: SocketAddr,
stats: Arc<Stats>,
relayfee: f64,
chan: SyncSender<Message>,
) -> Connection {
Connection {
query,
last_header_entry: None, // disable header subscription for now
status_hashes: HashMap::new(),
stream,
addr,
chan: SyncChannel::new(10),
chan,
stats,
relayfee,
}
Expand Down Expand Up @@ -251,7 +252,7 @@ impl Connection {
let tx: Transaction = deserialize(&tx).chain_err(|| "failed to parse tx")?;
let txid = self.query.broadcast(&tx)?;
self.query.update_mempool()?;
if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) {
if let Err(e) = self.chan.try_send(Message::PeriodicUpdate) {
warn!("failed to issue PeriodicUpdate after broadcast: {}", e);
}
Ok(json!(txid.to_hex()))
Expand Down Expand Up @@ -390,10 +391,10 @@ impl Connection {
Ok(())
}

fn handle_replies(&mut self) -> Result<()> {
fn handle_replies(&mut self, receiver: Receiver<Message>) -> Result<()> {
let empty_params = json!([]);
loop {
let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
let msg = receiver.recv().chain_err(|| "channel closed")?;
trace!("RPC {:?}", msg);
match msg {
Message::Request(line) => {
Expand Down Expand Up @@ -451,11 +452,11 @@ impl Connection {
}
}

pub fn run(mut self) {
pub fn run(mut self, receiver: Receiver<Message>) {
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let tx = self.chan.clone();
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies() {
if let Err(e) = self.handle_replies(receiver) {
error!(
"[{}] connection handling failed: {}",
self.addr,
Expand Down Expand Up @@ -571,15 +572,16 @@ impl RPC {
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
// explicitely scope the shadowed variables for the new thread
let query = Arc::clone(&query);
let senders = Arc::clone(&senders);
let stats = Arc::clone(&stats);
let garbage_sender = garbage_sender.clone();
let (sender, receiver) = mpsc::sync_channel(10);

senders.lock().unwrap().push(sender.clone());

let spawned = spawn_thread("peer", move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr, stats, relayfee);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
let conn = Connection::new(query, stream, addr, stats, relayfee, sender);
conn.run(receiver);
info!("[{}] disconnected peer", addr);
let _ = garbage_sender.send(std::thread::current().id());
});
Expand Down

0 comments on commit 4764dcc

Please sign in to comment.