Unverified Commit fbcb3855 authored by Roman Zeyde's avatar Roman Zeyde
Browse files

Schedule periodic RPC updates from main thread polling

parent 83513298
Electrum
========
* Poll mempool after transaction broadcast
* Update subscriptions after index/mempool update
* Snapshot DB after successful indexing - and run queries on the latest snapshot
* Update height to -1 for txns with any `unconfirmed input <https://electrumx.readthedocs.io/en/latest/protocol-basics.html#status>`_
......
......@@ -15,7 +15,7 @@ use electrs::{app::{App, Waiter},
errors::*,
index::Index,
query::Query,
rpc,
rpc::RPC,
store::{DBStore, StoreOptions}};
fn run_server(config: &Config) -> Result<()> {
......@@ -39,13 +39,13 @@ fn run_server(config: &Config) -> Result<()> {
let app = App::new(store, index, daemon);
let query = Query::new(app.clone());
rpc::start(&config.rpc_addr, query.clone());
let rpc = RPC::start(config.rpc_addr, query.clone());
while let None = signal.wait() {
query.update_mempool()?;
if tip == app.daemon().getbestblockhash()? {
continue;
if tip != app.daemon().getbestblockhash()? {
tip = app.index().update(app.write_store(), app.daemon())?;
}
tip = app.index().update(app.write_store(), app.daemon())?;
rpc.notify();
}
info!("closing server");
Ok(())
......
......@@ -7,10 +7,9 @@ use serde_json::{from_str, Number, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender};
use std::sync::Arc;
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use query::Query;
use util::HeaderEntry;
......@@ -50,6 +49,7 @@ struct Connection {
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
chan: Channel,
}
impl Connection {
......@@ -60,6 +60,7 @@ impl Connection {
status_hashes: HashMap::new(),
stream,
addr,
chan: Channel::new(),
}
}
......@@ -263,15 +264,9 @@ impl Connection {
Ok(())
}
fn handle_replies(&mut self, chan: &Channel) -> Result<()> {
let poll_duration = Duration::from_secs(5);
let rx = chan.receiver();
fn handle_replies(&mut self) -> Result<()> {
loop {
let msg = match rx.recv_timeout(poll_duration) {
Ok(msg) => msg,
Err(RecvTimeoutError::Timeout) => Message::PeriodicUpdate,
Err(RecvTimeoutError::Disconnected) => bail!("channel closed"),
};
let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
......@@ -317,10 +312,9 @@ impl Connection {
pub fn run(mut self) {
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let chan = Channel::new();
let tx = chan.sender();
let tx = self.chan.sender();
let child = thread::spawn(|| Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies(&chan) {
if let Err(e) = self.handle_replies() {
error!(
"[{}] connection handling failed: {}",
self.addr,
......@@ -361,16 +355,58 @@ impl Channel {
}
}
pub fn start(addr: &SocketAddr, query: Arc<Query>) -> thread::JoinHandle<()> {
let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
info!("RPC server running on {}", addr);
thread::spawn(move || loop {
let (stream, addr) = listener.accept().expect("accept failed");
let query = query.clone();
pub struct RPC {
acceptor: thread::JoinHandle<()>,
notification: Sender<()>,
}
impl RPC {
fn start_notification_worker(
receiver: Receiver<()>,
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
) {
thread::spawn(move || {
info!("[{}] connected peer", addr);
Connection::new(query, stream, addr).run();
info!("[{}] disconnected peer", addr);
for _ in receiver.iter() {
let mut senders = senders.lock().unwrap();
for sender in senders.split_off(0) {
if let Err(TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
}
senders.push(sender);
}
}
});
})
}
pub fn start(addr: SocketAddr, query: Arc<Query>) -> RPC {
let (notification_tx, notification_rx) = channel();
let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
info!("RPC server running on {}", addr);
let child = thread::spawn(move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
RPC::start_notification_worker(notification_rx, senders.clone());
loop {
let (stream, addr) = listener.accept().expect("accept failed");
let query = query.clone();
let senders = senders.clone();
thread::spawn(move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
});
}
});
RPC {
acceptor: child,
notification: notification_tx,
}
}
pub fn notify(&self) {
self.notification.send(()).unwrap();
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment