Unverified Commit 92bfe88b authored by Roman Zeyde's avatar Roman Zeyde
Browse files

Handle block and script_hash subscription

parent 10c6daeb
......@@ -39,10 +39,10 @@ fn history_from_status(status: &Status) -> Vec<(u32, Sha256dHash)> {
txns
}
fn hash_from_status(status: &Status) -> Option<FullHash> {
fn hash_from_status(status: &Status) -> Value {
let txns = history_from_status(status);
if txns.is_empty() {
return None;
return Value::Null;
}
let mut hash = FullHash::default();
......@@ -52,10 +52,10 @@ fn hash_from_status(status: &Status) -> Option<FullHash> {
sha2.input(part.as_bytes());
}
sha2.result(&mut hash);
Some(hash)
Value::String(util::hexlify(&hash))
}
fn format_header(header: &BlockHeader, height: usize) -> Value {
fn jsonify_header(header: &BlockHeader, height: usize) -> Value {
json!({
"block_height": height,
"version": header.version,
......@@ -69,14 +69,25 @@ fn format_header(header: &BlockHeader, height: usize) -> Value {
struct Handler<'a> {
query: &'a Query<'a>,
headers_subscribe: bool,
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
}
impl<'a> Handler<'a> {
fn blockchain_headers_subscribe(&self) -> Result<Value> {
pub fn new(query: &'a Query) -> Handler<'a> {
Handler {
query: query,
headers_subscribe: false,
status_hashes: HashMap::new(),
}
}
fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
let entry = self.query
.get_best_header()
.chain_err(|| "no headers found")?;
Ok(format_header(entry.header(), entry.height()))
self.headers_subscribe = true;
Ok(jsonify_header(entry.header(), entry.height()))
}
fn server_version(&self) -> Result<Value> {
......@@ -118,7 +129,7 @@ impl<'a> Handler<'a> {
let height = params.get(0).chain_err(|| "missing height")?;
let height = height.as_u64().chain_err(|| "non-number height")? as usize;
let headers = self.query.get_headers(&vec![height]);
Ok(json!(format_header(&headers[0], height)))
Ok(json!(jsonify_header(&headers[0], height)))
}
fn blockchain_estimatefee(&self, _params: &[Value]) -> Result<Value> {
......@@ -129,14 +140,12 @@ impl<'a> Handler<'a> {
Ok(json!(1e-5)) // TODO: consult with actual node
}
fn blockchain_scripthash_subscribe(&self, params: &[Value]) -> Result<Value> {
fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let status = self.query.status(&script_hash[..]);
Ok(match hash_from_status(&status) {
Some(hash) => Value::String(util::hexlify(&hash)),
None => Value::Null,
})
let result = hash_from_status(&status);
self.status_hashes.insert(script_hash, result.clone());
Ok(result)
}
fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result<Value> {
......@@ -180,7 +189,7 @@ impl<'a> Handler<'a> {
"pos": pos}))
}
fn handle_command(&self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
fn handle_command(&mut self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
let result = match method {
"blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
"server.version" => self.server_version(),
......@@ -203,7 +212,35 @@ impl<'a> Handler<'a> {
Ok(reply)
}
pub fn run(self, mut stream: TcpStream, addr: SocketAddr, chan: &Channel) -> Result<()> {
fn update_subscriptions(&mut self, blockhash: &Sha256dHash) -> Result<Vec<Value>> {
info!("block {} found", blockhash);
let mut result = vec![];
if self.headers_subscribe {
let entry = self.query
.get_best_header()
.chain_err(|| "no headers found")?;
let header = jsonify_header(entry.header(), entry.height());
result.push(json!({
"jsonrpc": "2.0",
"method": "blockchain.headers.subscribe",
"params": [header]}));
}
for (script_hash, status_hash) in self.status_hashes.iter_mut() {
let status = self.query.status(&script_hash[..]);
let new_status_hash = hash_from_status(&status);
if new_status_hash == *status_hash {
continue;
}
result.push(json!({
"jsonrpc": "2.0",
"method": "blockchain.scripthash.subscribe",
"params": [script_hash.be_hex_string(), new_status_hash]}));
*status_hash = new_status_hash;
}
Ok(result)
}
pub fn run(mut self, mut stream: TcpStream, addr: SocketAddr, chan: &Channel) -> Result<()> {
let mut reader = BufReader::new(stream
.try_clone()
.chain_err(|| "failed to clone TcpStream")?);
......@@ -246,7 +283,17 @@ impl<'a> Handler<'a> {
.write_all(line.as_bytes())
.chain_err(|| "failed to send response")?;
}
Message::Block(blockhash) => info!("block {} found", blockhash),
Message::Block(blockhash) => {
for update in self.update_subscriptions(&blockhash)
.chain_err(|| "failed to get updates")?
{
debug!("update: {}", update);
let line = update.to_string() + "\n";
stream
.write_all(line.as_bytes())
.chain_err(|| "failed to send update")?;
}
}
Message::Done => break,
}
}
......@@ -287,7 +334,7 @@ pub fn serve(addr: &str, query: &Query, mut chan: Channel) {
loop {
let (stream, addr) = listener.accept().unwrap();
info!("[{}] connected peer", addr);
let handler = Handler { query };
let handler = Handler::new(query);
match handler.run(stream, addr, &mut chan) {
Ok(()) => info!("[{}] disconnected peer", addr),
Err(ref e) => {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment