Unverified Commit 10c6daeb authored by Roman Zeyde's avatar Roman Zeyde
Browse files

React to new blocks' notifications

parent 318008ac
......@@ -54,9 +54,12 @@ fn run_server(config: Config) {
let query = query::Query::new(&store, &daemon, &index);
crossbeam::scope(|scope| {
scope.spawn(|| rpc::serve("localhost:50001", &query));
let chan = rpc::Channel::new();
let tx = chan.sender();
scope.spawn(|| rpc::serve("localhost:50001", &query, chan));
loop {
waiter.wait();
let blockhash = waiter.wait();
tx.send(rpc::Message::Block(blockhash)).unwrap();
if config.enable_indexing {
index.update(&store, &daemon);
}
......
......@@ -6,6 +6,7 @@ use bitcoin::network::serialize::{deserialize, serialize};
use bitcoin::util::hash::Sha256dHash;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use log::Level;
use pbr;
use std::collections::VecDeque;
use std::io::{stderr, Stderr};
......@@ -384,7 +385,12 @@ impl Index {
.iter()
.filter(|entry| !indexed_headers.contains_key(&entry.hash()))
.collect();
info!(
log!(
if missing_headers.is_empty() {
Level::Debug
} else {
Level::Info
},
"height {}, best {} @ {} ({} left to index)",
current_headers.headers().len() - 1,
best_block_header.bitcoin_hash(),
......
......@@ -2,6 +2,7 @@
extern crate bincode;
extern crate bitcoin;
extern crate crossbeam;
extern crate crypto;
extern crate extfmt;
extern crate itertools;
......
use bitcoin::blockdata::block::BlockHeader;
use bitcoin::network::serialize::serialize;
use bitcoin::util::hash::Sha256dHash;
use crossbeam;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use itertools;
use serde_json::{from_str, Number, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use query::{Query, Status};
use index::FullHash;
use util;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
error_chain!{}
struct Handler<'a> {
query: &'a Query<'a>,
}
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
let script_hash = val.chain_err(|| "missing hash")?;
......@@ -69,6 +67,10 @@ fn format_header(header: &BlockHeader, height: usize) -> Value {
})
}
struct Handler<'a> {
query: &'a Query<'a>,
}
impl<'a> Handler<'a> {
fn blockchain_headers_subscribe(&self) -> Result<Value> {
let entry = self.query
......@@ -201,51 +203,92 @@ impl<'a> Handler<'a> {
Ok(reply)
}
pub fn run(self, mut stream: TcpStream, addr: SocketAddr) -> Result<()> {
pub fn run(self, mut stream: TcpStream, addr: SocketAddr, chan: &Channel) -> Result<()> {
let mut reader = BufReader::new(stream
.try_clone()
.chain_err(|| "failed to clone TcpStream")?);
let mut line = String::new();
loop {
line.clear();
reader
.read_line(&mut line)
.chain_err(|| "failed to read a request")?;
if line.is_empty() {
break;
let tx = chan.sender();
crossbeam::scope(|scope| {
scope.spawn(|| loop {
let mut line = String::new();
line.clear();
reader
.read_line(&mut line)
.expect("failed to read a request");
if line.is_empty() {
tx.send(Message::Done).expect("channel closed");
break;
} else {
tx.send(Message::Request(line)).expect("channel closed");
}
});
let rx = chan.receiver();
loop {
let msg = rx.recv().chain_err(|| "channel closed")?;
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
(
Some(&Value::String(ref method)),
Some(&Value::Array(ref params)),
Some(&Value::Number(ref id)),
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
info!("[{}] {}", addr, cmd);
debug!("reply: {}", reply);
let line = reply.to_string() + "\n";
stream
.write_all(line.as_bytes())
.chain_err(|| "failed to send response")?;
}
Message::Block(blockhash) => info!("block {} found", blockhash),
Message::Done => break,
}
}
let line = line.trim_right();
let cmd: Value = from_str(line).chain_err(|| "invalid JSON format")?;
let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
(
Some(&Value::String(ref method)),
Some(&Value::Array(ref params)),
Some(&Value::Number(ref id)),
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
info!("[{}] {}", addr, cmd);
debug!("reply: {}", reply);
let line = reply.to_string() + "\n";
stream
.write_all(line.as_bytes())
.chain_err(|| "failed to send response")?;
}
Ok(())
Ok(())
})
}
}
pub enum Message {
Request(String),
Block(Sha256dHash),
Done,
}
pub struct Channel {
tx: SyncSender<Message>,
rx: Receiver<Message>,
}
impl Channel {
pub fn new() -> Channel {
let (tx, rx) = sync_channel(0);
Channel { tx, rx }
}
pub fn sender(&self) -> SyncSender<Message> {
self.tx.clone()
}
pub fn receiver(&self) -> &Receiver<Message> {
&self.rx
}
}
pub fn serve(addr: &str, query: &Query) {
pub fn serve(addr: &str, query: &Query, mut chan: Channel) {
let listener = TcpListener::bind(addr).unwrap();
info!("RPC server running on {}", addr);
loop {
let (stream, addr) = listener.accept().unwrap();
info!("[{}] connected peer", addr);
let handler = Handler { query };
match handler.run(stream, addr) {
match handler.run(stream, addr, &mut chan) {
Ok(()) => info!("[{}] disconnected peer", addr),
Err(ref e) => {
error!("[{}] {}", addr, e);
......
use bitcoin::network::serialize::deserialize;
use bitcoin::util::hash::Sha256dHash;
use zmq;
use types::Bytes;
pub struct Waiter {
sock: zmq::Socket,
}
......@@ -15,9 +15,9 @@ impl Waiter {
Waiter { sock }
}
pub fn wait(&self) -> Bytes {
pub fn wait(&self) -> Sha256dHash {
let mut blockhash = self.sock.recv_multipart(0).unwrap().remove(1);
blockhash.reverse(); // block hash needs to be LSB-first
blockhash
deserialize(&blockhash).unwrap()
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment