Unverified Commit d82489c2 authored by Roman Zeyde's avatar Roman Zeyde
Browse files

Improve RPC error handing

parent 4404f120
......@@ -9,7 +9,7 @@ use itertools;
use serde_json::{from_str, Number, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use query::{Query, Status};
......@@ -208,12 +208,10 @@ impl<'a> Handler<'a> {
"blockchain.transaction.get_merkle" => self.blockchain_transaction_get_merkle(&params),
&_ => bail!("unknown method {} {:?}", method, params),
}?;
let reply = json!({"jsonrpc": "2.0", "id": id, "result": result});
Ok(reply)
Ok(json!({"jsonrpc": "2.0", "id": id, "result": result}))
}
fn update_subscriptions(&mut self, blockhash: &Sha256dHash) -> Result<Vec<Value>> {
info!("block {} found", blockhash);
fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
let mut result = vec![];
if self.headers_subscribe {
let entry = self.query
......@@ -240,65 +238,81 @@ impl<'a> Handler<'a> {
Ok(result)
}
pub fn run(mut self, mut stream: TcpStream, addr: SocketAddr, chan: &Channel) -> Result<()> {
let mut reader = BufReader::new(stream
.try_clone()
.chain_err(|| "failed to clone TcpStream")?);
let tx = chan.sender();
crossbeam::scope(|scope| {
scope.spawn(|| loop {
let mut line = String::new();
line.clear();
reader
.read_line(&mut line) // TODO: use .lines() iterator
.expect("failed to read a request");
if line.is_empty() {
tx.send(Message::Done).expect("channel closed");
break;
} else {
tx.send(Message::Request(line)).expect("channel closed");
fn handle_replies(
&mut self,
stream: &mut TcpStream,
addr: SocketAddr,
chan: &Channel,
) -> Result<()> {
let rx = chan.receiver();
loop {
let msg = rx.recv().chain_err(|| "channel closed")?;
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
info!("[{}] {}", addr, cmd);
let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
(
Some(&Value::String(ref method)),
Some(&Value::Array(ref params)),
Some(&Value::Number(ref id)),
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
debug!("reply: {}", reply);
let line = reply.to_string() + "\n";
stream
.write_all(line.as_bytes())
.chain_err(|| "failed to send response")?;
}
});
let rx = chan.receiver();
loop {
let msg = rx.recv().chain_err(|| "channel closed")?;
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
(
Some(&Value::String(ref method)),
Some(&Value::Array(ref params)),
Some(&Value::Number(ref id)),
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
info!("[{}] {}", addr, cmd);
debug!("reply: {}", reply);
let line = reply.to_string() + "\n";
Message::Block(blockhash) => {
debug!("blockhash found: {}", blockhash);
for update in self.update_subscriptions()
.chain_err(|| "failed to get updates")?
{
debug!("update: {}", update);
let line = update.to_string() + "\n";
stream
.write_all(line.as_bytes())
.chain_err(|| "failed to send response")?;
}
Message::Block(blockhash) => {
for update in self.update_subscriptions(&blockhash)
.chain_err(|| "failed to get updates")?
{
debug!("update: {}", update);
let line = update.to_string() + "\n";
stream
.write_all(line.as_bytes())
.chain_err(|| "failed to send update")?;
}
.chain_err(|| "failed to send update")?;
}
Message::Done => break,
}
Message::Done => {
debug!("done");
break;
}
}
Ok(())
})
}
Ok(())
}
fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
loop {
let mut line = String::new();
reader
.read_line(&mut line) // TODO: use .lines() iterator
.expect("failed to read a request");
if line.is_empty() {
tx.send(Message::Done).expect("channel closed");
break;
} else {
tx.send(Message::Request(line)).expect("channel closed");
}
}
Ok(())
}
pub fn run(mut self, mut stream: TcpStream, addr: SocketAddr, chan: &Channel) {
let reader = BufReader::new(stream.try_clone().expect("failed to clone TcpStream"));
crossbeam::scope(|scope| {
let tx = chan.sender();
let reader = scope.spawn(|| Handler::handle_requests(reader, tx));
self.handle_replies(&mut stream, addr, chan)
.err()
.map(|e| log_error(&addr, e));
stream.shutdown(Shutdown::Both).expect("shutdown failed");
reader.join().err().map(|e| log_error(&addr, e));
});
}
}
......@@ -328,21 +342,20 @@ impl Channel {
}
}
pub fn serve(addr: &str, query: &Query, mut chan: Channel) {
pub fn serve(addr: &str, query: &Query, chan: Channel) {
let listener = TcpListener::bind(addr).unwrap();
info!("RPC server running on {}", addr);
loop {
let (stream, addr) = listener.accept().unwrap();
info!("[{}] connected peer", addr);
let handler = Handler::new(query);
match handler.run(stream, addr, &mut chan) {
Ok(()) => info!("[{}] disconnected peer", addr),
Err(ref e) => {
error!("[{}] {}", addr, e);
for e in e.iter().skip(1) {
error!("caused by: {}", e);
}
}
}
Handler::new(query).run(stream, addr, &chan);
info!("[{}] disconnected peer", addr);
}
}
fn log_error(addr: &SocketAddr, e: Error) {
error!("[{}] {}", addr, e);
for e in e.iter().skip(1) {
error!("caused by: {}", e);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment