Unverified Commit 165b48cc authored by Roman Zeyde's avatar Roman Zeyde
Browse files

Gracefully close Electrum RPC server from after SIGINT

parent c2af5bc0
......@@ -44,6 +44,7 @@ fn run_server(config: &Config) -> Result<()> {
}
rpc.notify();
}
rpc.exit();
Ok(())
}
......
......@@ -350,8 +350,14 @@ pub enum Message {
Done,
}
pub enum Notification {
Periodic,
Exit,
}
pub struct RPC {
notification: Sender<()>,
notification: Sender<Notification>,
server: thread::JoinHandle<()>,
}
struct Stats {
......@@ -360,33 +366,38 @@ struct Stats {
}
impl RPC {
fn start_notification_worker(
notification: Channel<()>,
fn start_notifier(
notification: Channel<Notification>,
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
) {
thread::spawn(move || {
for _ in notification.receiver().iter() {
for msg in notification.receiver().iter() {
let mut senders = senders.lock().unwrap();
for sender in senders.split_off(0) {
if let Err(TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
}
senders.push(sender);
match msg {
Notification::Periodic => for sender in senders.split_off(0) {
if let Err(TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
}
senders.push(sender);
},
Notification::Exit => acceptor.send(None).unwrap(),
}
}
});
}
fn start_acceptor(addr: SocketAddr) -> Channel<(TcpStream, SocketAddr)> {
fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
let chan = Channel::new();
let tx = chan.sender();
thread::spawn(move || {
let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
info!("RPC server running on {}", addr);
loop {
let (stream, addr) = listener.accept().expect("accept failed");
tx.send((stream, addr)).expect("send failed");
tx.send(Some((stream, addr))).expect("send failed");
}
});
chan
......@@ -406,29 +417,34 @@ impl RPC {
let notification = Channel::new();
let handle = RPC {
notification: notification.sender(),
server: thread::spawn(move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
let acceptor = RPC::start_acceptor(addr);
RPC::start_notifier(notification, senders.clone(), acceptor.sender());
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
let query = query.clone();
let senders = senders.clone();
let stats = stats.clone();
thread::spawn(move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr, stats);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
});
}
info!("stopping RPC server")
}),
};
info!("RPC server running on {}", addr);
thread::spawn(move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
RPC::start_notification_worker(notification, senders.clone());
let clients = RPC::start_acceptor(addr);
for (stream, addr) in clients.receiver().iter() {
let query = query.clone();
let senders = senders.clone();
let stats = stats.clone();
thread::spawn(move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr, stats);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
});
}
});
handle
}
pub fn notify(&self) {
self.notification.send(()).unwrap();
self.notification.send(Notification::Periodic).unwrap();
}
pub fn exit(self) {
self.notification.send(Notification::Exit).unwrap();
self.server.join().unwrap();
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment