Unverified Commit c2af5bc0 authored by Roman Zeyde's avatar Roman Zeyde
Browse files

Split accepting clients into a separate thread

parent 4aa0a45c
......@@ -7,13 +7,13 @@ use serde_json::{from_str, Number, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TrySendError};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
use metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use query::Query;
use util::HeaderEntry;
use util::{Channel, HeaderEntry, SyncChannel};
use errors::*;
......@@ -50,7 +50,7 @@ struct Connection {
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
chan: Channel,
chan: SyncChannel<Message>,
stats: Arc<Stats>,
}
......@@ -67,7 +67,7 @@ impl Connection {
status_hashes: HashMap::new(),
stream,
addr,
chan: Channel::new(),
chan: SyncChannel::new(10),
stats,
}
}
......@@ -350,26 +350,6 @@ pub enum Message {
Done,
}
pub struct Channel {
tx: SyncSender<Message>,
rx: Receiver<Message>,
}
impl Channel {
pub fn new() -> Channel {
let (tx, rx) = sync_channel(10);
Channel { tx, rx }
}
pub fn sender(&self) -> SyncSender<Message> {
self.tx.clone()
}
pub fn receiver(&self) -> &Receiver<Message> {
&self.rx
}
}
pub struct RPC {
notification: Sender<()>,
}
......@@ -381,11 +361,11 @@ struct Stats {
impl RPC {
fn start_notification_worker(
receiver: Receiver<()>,
notification: Channel<()>,
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
) {
thread::spawn(move || {
for _ in receiver.iter() {
for _ in notification.receiver().iter() {
let mut senders = senders.lock().unwrap();
for sender in senders.split_off(0) {
if let Err(TrySendError::Disconnected(_)) =
......@@ -399,6 +379,19 @@ impl RPC {
});
}
fn start_acceptor(addr: SocketAddr) -> Channel<(TcpStream, SocketAddr)> {
let chan = Channel::new();
let tx = chan.sender();
thread::spawn(move || {
let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
loop {
let (stream, addr) = listener.accept().expect("accept failed");
tx.send((stream, addr)).expect("send failed");
}
});
chan
}
pub fn start(addr: SocketAddr, query: Arc<Query>, metrics: &Metrics) -> RPC {
let stats = Arc::new(Stats {
latency: metrics.histogram(
......@@ -410,14 +403,16 @@ impl RPC {
"# of Electrum subscriptions",
)),
});
let (tx, rx) = channel();
let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
let notification = Channel::new();
let handle = RPC {
notification: notification.sender(),
};
info!("RPC server running on {}", addr);
thread::spawn(move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
RPC::start_notification_worker(rx, senders.clone());
loop {
let (stream, addr) = listener.accept().expect("accept failed");
RPC::start_notification_worker(notification, senders.clone());
let clients = RPC::start_acceptor(addr);
for (stream, addr) in clients.receiver().iter() {
let query = query.clone();
let senders = senders.clone();
let stats = stats.clone();
......@@ -430,7 +425,7 @@ impl RPC {
});
}
});
RPC { notification: tx }
handle
}
pub fn notify(&self) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment