Unverified Commit 87d32b75 authored by kenshin samourai's avatar kenshin samourai Committed by GitHub
Browse files

Merge pull request #4 from Samourai-Wallet/fix_threads_cleanup

clean up rpc threads after the connection is closed
parents 281859f7 4e8172d2
...@@ -2,6 +2,7 @@ use bitcoin_hashes::hex::{FromHex, ToHex}; ...@@ -2,6 +2,7 @@ use bitcoin_hashes::hex::{FromHex, ToHex};
use bitcoin_hashes::sha256d::Hash as Sha256dHash; use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use error_chain::ChainedError; use error_chain::ChainedError;
use serde_json::{from_str, Value}; use serde_json::{from_str, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write}; use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::SyncSender; use std::sync::mpsc::SyncSender;
...@@ -219,30 +220,50 @@ impl RPC { ...@@ -219,30 +220,50 @@ impl RPC {
pub fn start(addr: SocketAddr, query: Arc<Query>) -> RPC { pub fn start(addr: SocketAddr, query: Arc<Query>) -> RPC {
RPC { RPC {
server: Some(spawn_thread("rpc", move || { server: Some(spawn_thread("rpc", move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new())); let senders = Arc::new(Mutex::new(HashMap::<i32, SyncSender<Message>>::new()));
let handles = Arc::new(Mutex::new(
HashMap::<i32, std::thread::JoinHandle<()>>::new(),
));
let acceptor = RPC::start_acceptor(addr); let acceptor = RPC::start_acceptor(addr);
let mut children = vec![]; let mut handle_count = 0;
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
let query = query.clone(); let handle_id = handle_count;
let senders = senders.clone(); handle_count += 1;
children.push(spawn_thread("peer", move || { // explicitely scope the shadowed variables for the new thread
info!("[{}] connected peer", addr); let handle: thread::JoinHandle<()> = {
let conn = Connection::new(query, stream, addr); let query = Arc::clone(&query);
senders.lock().unwrap().push(conn.chan.sender()); let senders = Arc::clone(&senders);
conn.run(); let handles = Arc::clone(&handles);
info!("[{}] disconnected peer", addr);
})); spawn_thread("peer", move || {
info!("[{}] connected peer #{}", addr, handle_id);
let conn = Connection::new(query, stream, addr);
senders
.lock()
.unwrap()
.insert(handle_id, conn.chan.sender());
conn.run();
info!("[{}] disconnected peer #{}", addr, handle_id);
senders.lock().unwrap().remove(&handle_id);
handles.lock().unwrap().remove(&handle_id);
})
};
handles.lock().unwrap().insert(handle_id, handle);
} }
trace!("closing {} RPC connections", senders.lock().unwrap().len()); trace!("closing {} RPC connections", senders.lock().unwrap().len());
for sender in senders.lock().unwrap().iter() { for sender in senders.lock().unwrap().values() {
let _ = sender.send(Message::Done); let _ = sender.send(Message::Done);
} }
trace!("waiting for {} RPC handling threads", children.len()); trace!("waiting for {} RPC handling threads", handles.lock().unwrap().len());
for child in children { for (_, handle) in handles.lock().unwrap().drain() {
let _ = child.join(); if let Err(e) = handle.join() {
warn!("failed to join thread: {:?}", e);
}
} }
trace!("RPC connections are closed"); trace!("RPC connections are closed");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment