rpc.rs 13.7 KB
Newer Older
Roman Zeyde's avatar
Roman Zeyde committed
1
use bitcoin::blockdata::block::BlockHeader;
2 3
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::network::serialize::{deserialize, serialize};
4
use bitcoin::util::hash::Sha256dHash;
5
use error_chain::ChainedError;
6
use hex;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
7
use serde_json::{from_str, Number, Value};
Roman Zeyde's avatar
Roman Zeyde committed
8
use std::collections::HashMap;
9
use std::io::{BufRead, BufReader, Write};
10
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
11
use std::sync::Arc;
12
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender};
13
use std::thread;
14
use std::time::Duration;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
15

16
use query::Query;
Roman Zeyde's avatar
Roman Zeyde committed
17
use util::HeaderEntry;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
18

Roman Zeyde's avatar
Roman Zeyde committed
19
use errors::*;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
20

Roman Zeyde's avatar
Roman Zeyde committed
21
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
Roman Zeyde's avatar
Roman Zeyde committed
22 23
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
    let script_hash = val.chain_err(|| "missing hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
24 25 26 27 28
    let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
    let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex hash")?;
    Ok(script_hash)
}

29
fn jsonify_header(header: &BlockHeader, height: usize) -> Value {
Roman Zeyde's avatar
Roman Zeyde committed
30 31 32 33 34 35 36 37 38 39 40
    json!({
        "block_height": height,
        "version": header.version,
        "prev_block_hash": header.prev_blockhash.be_hex_string(),
        "merkle_root": header.merkle_root.be_hex_string(),
        "timestamp": header.time,
        "bits": header.bits,
        "nonce": header.nonce
    })
}

41 42
struct Connection {
    query: Arc<Query>,
Roman Zeyde's avatar
Roman Zeyde committed
43
    last_header_entry: Option<HeaderEntry>,
44
    status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
45 46
    stream: TcpStream,
    addr: SocketAddr,
47 48
}

49 50
impl Connection {
    pub fn new(query: Arc<Query>, stream: TcpStream, addr: SocketAddr) -> Connection {
51
        Connection {
52
            query: query,
Roman Zeyde's avatar
Roman Zeyde committed
53
            last_header_entry: None, // disable header subscription for now
54
            status_hashes: HashMap::new(),
55 56
            stream,
            addr,
57 58 59 60
        }
    }

    fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
61
        let entry = self.query.get_best_header()?;
Roman Zeyde's avatar
Roman Zeyde committed
62
        self.last_header_entry = Some(entry.clone());
63
        Ok(jsonify_header(entry.header(), entry.height()))
64
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
65

66
    fn server_version(&self) -> Result<Value> {
67
        Ok(json!(["RustElectrum 0.1.0", "1.2"]))
68
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
69

70
    fn server_banner(&self) -> Result<Value> {
71
        Ok(json!("Welcome to RustElectrum Server!\n"))
72
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
73

74
    fn server_donation_address(&self) -> Result<Value> {
75
        Ok(Value::Null)
76
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
77

78 79 80
    fn server_peers_subscribe(&self) -> Result<Value> {
        Ok(json!([]))
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
81

82
    fn mempool_get_fee_histogram(&self) -> Result<Value> {
83
        Ok(json!(self.query.get_fee_histogram()))
84
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
85

86 87 88 89 90
    fn blockchain_block_get_chunk(&self, params: &[Value]) -> Result<Value> {
        const CHUNK_SIZE: usize = 2016;
        let index = params.get(0).chain_err(|| "missing index")?;
        let index = index.as_u64().chain_err(|| "non-number index")? as usize;
        let heights: Vec<usize> = (0..CHUNK_SIZE).map(|h| index * CHUNK_SIZE + h).collect();
Roman Zeyde's avatar
Roman Zeyde committed
91 92 93 94 95 96
        let headers: Vec<String> = self.query
            .get_headers(&heights)
            .into_iter()
            .map(|x| hex::encode(&serialize(&x).unwrap()))
            .collect();
        Ok(json!(headers.join("")))
97 98
    }

Roman Zeyde's avatar
Roman Zeyde committed
99 100 101 102
    fn blockchain_block_get_header(&self, params: &[Value]) -> Result<Value> {
        let height = params.get(0).chain_err(|| "missing height")?;
        let height = height.as_u64().chain_err(|| "non-number height")? as usize;
        let headers = self.query.get_headers(&vec![height]);
103
        Ok(json!(jsonify_header(&headers[0], height)))
Roman Zeyde's avatar
Roman Zeyde committed
104 105
    }

106 107 108 109 110
    fn blockchain_estimatefee(&self, params: &[Value]) -> Result<Value> {
        let blocks = params.get(0).chain_err(|| "missing blocks")?;
        let blocks = blocks.as_u64().chain_err(|| "non-number blocks")? as usize;
        let fee_rate = self.query.estimate_fee(blocks); // in BTC/kB
        Ok(json!(fee_rate))
111
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
112

Roman Zeyde's avatar
Roman Zeyde committed
113
    fn blockchain_relayfee(&self) -> Result<Value> {
114
        Ok(json!(0.0)) // allow sending transactions with any fee.
Roman Zeyde's avatar
Roman Zeyde committed
115 116
    }

117
    fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
118
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
119
        let status = self.query.status(&script_hash[..])?;
120
        let result = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
121 122
        self.status_hashes.insert(script_hash, result.clone());
        Ok(result)
123
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
124

125
    fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
126
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
127
        let status = self.query.status(&script_hash[..])?;
128 129 130
        Ok(
            json!({ "confirmed": status.confirmed_balance(), "unconfirmed": status.mempool_balance() }),
        )
131
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
132

Roman Zeyde's avatar
Roman Zeyde committed
133
    fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
134
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
135
        let status = self.query.status(&script_hash[..])?;
Roman Zeyde's avatar
Roman Zeyde committed
136
        Ok(json!(Value::Array(
137 138
            status
                .history()
Roman Zeyde's avatar
Roman Zeyde committed
139 140 141 142
                .into_iter()
                .map(|item| json!({"height": item.0, "tx_hash": item.1.be_hex_string()}))
                .collect()
        )))
143
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
144

145 146 147 148 149
    fn blockchain_transaction_broadcast(&self, params: &[Value]) -> Result<Value> {
        let tx = params.get(0).chain_err(|| "missing tx")?;
        let tx = tx.as_str().chain_err(|| "non-string tx")?;
        let tx = hex::decode(&tx).chain_err(|| "non-hex tx")?;
        let tx: Transaction = deserialize(&tx).chain_err(|| "failed to parse tx")?;
150
        let txid = self.query.broadcast(&tx)?;
151 152 153
        Ok(json!(txid.be_hex_string()))
    }

154
    fn blockchain_transaction_get(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
155
        // TODO: handle 'verbose' param
Roman Zeyde's avatar
Roman Zeyde committed
156
        let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
157
        let tx = self.query.get_tx(&tx_hash)?;
158
        Ok(json!(hex::encode(&serialize(&tx).unwrap())))
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
159 160
    }

Roman Zeyde's avatar
Roman Zeyde committed
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
    fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result<Value> {
        let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
        let height = params.get(1).chain_err(|| "missing height")?;
        let height = height.as_u64().chain_err(|| "non-number height")? as usize;
        let (merkle, pos) = self.query
            .get_merkle_proof(&tx_hash, height)
            .chain_err(|| "cannot create merkle proof")?;
        let merkle: Vec<String> = merkle
            .into_iter()
            .map(|txid| txid.be_hex_string())
            .collect();
        Ok(json!({
                "block_height": height,
                "merkle": merkle,
                "pos": pos}))
176
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
177

178
    fn handle_command(&mut self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
179 180 181 182 183 184 185
        let result = match method {
            "blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
            "server.version" => self.server_version(),
            "server.banner" => self.server_banner(),
            "server.donation_address" => self.server_donation_address(),
            "server.peers.subscribe" => self.server_peers_subscribe(),
            "mempool.get_fee_histogram" => self.mempool_get_fee_histogram(),
186
            "blockchain.block.get_chunk" => self.blockchain_block_get_chunk(&params),
Roman Zeyde's avatar
Roman Zeyde committed
187
            "blockchain.block.get_header" => self.blockchain_block_get_header(&params),
188
            "blockchain.estimatefee" => self.blockchain_estimatefee(&params),
Roman Zeyde's avatar
Roman Zeyde committed
189
            "blockchain.relayfee" => self.blockchain_relayfee(),
190 191 192
            "blockchain.scripthash.subscribe" => self.blockchain_scripthash_subscribe(&params),
            "blockchain.scripthash.get_balance" => self.blockchain_scripthash_get_balance(&params),
            "blockchain.scripthash.get_history" => self.blockchain_scripthash_get_history(&params),
193
            "blockchain.transaction.broadcast" => self.blockchain_transaction_broadcast(&params),
194 195 196
            "blockchain.transaction.get" => self.blockchain_transaction_get(&params),
            "blockchain.transaction.get_merkle" => self.blockchain_transaction_get_merkle(&params),
            &_ => bail!("unknown method {} {:?}", method, params),
197
        };
198
        // TODO: return application errors should be sent to the client
199 200 201
        Ok(match result {
            Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
            Err(e) => {
Roman Zeyde's avatar
Roman Zeyde committed
202 203 204 205 206 207 208
                warn!(
                    "rpc #{} {} {:?} failed: {}",
                    id,
                    method,
                    params,
                    e.display_chain()
                );
209 210 211
                json!({"jsonrpc": "2.0", "id": id, "error": format!("{}", e)})
            }
        })
212 213
    }

Roman Zeyde's avatar
Roman Zeyde committed
214
    fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
215
        let mut result = vec![];
Roman Zeyde's avatar
Roman Zeyde committed
216
        if let Some(ref mut last_entry) = self.last_header_entry {
217
            let entry = self.query.get_best_header()?;
Roman Zeyde's avatar
Roman Zeyde committed
218 219 220 221 222 223 224 225
            if *last_entry != entry {
                *last_entry = entry;
                let header = jsonify_header(last_entry.header(), last_entry.height());
                result.push(json!({
                    "jsonrpc": "2.0",
                    "method": "blockchain.headers.subscribe",
                    "params": [header]}));
            }
226 227
        }
        for (script_hash, status_hash) in self.status_hashes.iter_mut() {
228
            let status = self.query.status(&script_hash[..])?;
229
            let new_status_hash = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
230 231 232 233 234 235 236 237 238 239 240 241
            if new_status_hash == *status_hash {
                continue;
            }
            result.push(json!({
                "jsonrpc": "2.0",
                "method": "blockchain.scripthash.subscribe",
                "params": [script_hash.be_hex_string(), new_status_hash]}));
            *status_hash = new_status_hash;
        }
        Ok(result)
    }

242 243 244 245 246 247 248 249 250
    fn send_value(&mut self, v: Value) -> Result<()> {
        debug!("[{}] <- {}", self.addr, v);
        let line = v.to_string() + "\n";
        self.stream
            .write_all(line.as_bytes())
            .chain_err(|| format!("failed to send {}", v))
    }

    fn handle_replies(&mut self, chan: &Channel) -> Result<()> {
251
        let poll_duration = Duration::from_secs(5);
Roman Zeyde's avatar
Roman Zeyde committed
252 253
        let rx = chan.receiver();
        loop {
254 255 256 257 258
            let msg = match rx.recv_timeout(poll_duration) {
                Ok(msg) => msg,
                Err(RecvTimeoutError::Timeout) => Message::PeriodicUpdate,
                Err(RecvTimeoutError::Disconnected) => bail!("channel closed"),
            };
Roman Zeyde's avatar
Roman Zeyde committed
259 260 261
            match msg {
                Message::Request(line) => {
                    let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
262
                    debug!("[{}] -> {}", self.addr, cmd);
Roman Zeyde's avatar
Roman Zeyde committed
263 264 265 266 267 268 269 270
                    let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
                        (
                            Some(&Value::String(ref method)),
                            Some(&Value::Array(ref params)),
                            Some(&Value::Number(ref id)),
                        ) => self.handle_command(method, params, id)?,
                        _ => bail!("invalid command: {}", cmd),
                    };
271
                    self.send_value(reply)?
272
                }
273
                Message::PeriodicUpdate => {
Roman Zeyde's avatar
Roman Zeyde committed
274
                    for update in self.update_subscriptions()
275
                        .chain_err(|| "failed to update subscriptions")?
Roman Zeyde's avatar
Roman Zeyde committed
276
                    {
277
                        self.send_value(update)?
278
                    }
Roman Zeyde's avatar
Roman Zeyde committed
279 280 281 282
                }
                Message::Done => {
                    debug!("done");
                    break;
283
                }
284
            }
Roman Zeyde's avatar
Roman Zeyde committed
285 286 287 288
        }
        Ok(())
    }

289
    fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) {
Roman Zeyde's avatar
Roman Zeyde committed
290 291 292 293 294 295 296 297 298 299 300 301 302 303
        loop {
            let mut line = String::new();
            reader
                .read_line(&mut line)  // TODO: use .lines() iterator
                .expect("failed to read a request");
            if line.is_empty() {
                tx.send(Message::Done).expect("channel closed");
                break;
            } else {
                tx.send(Message::Request(line)).expect("channel closed");
            }
        }
    }

304
    pub fn run(mut self) {
305
        let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
306 307 308 309 310 311 312 313 314 315 316 317 318 319
        let chan = Channel::new();
        let tx = chan.sender();
        let child = thread::spawn(|| Connection::handle_requests(reader, tx));
        if let Err(e) = self.handle_replies(&chan) {
            error!(
                "[{}] connection handling failed: {}",
                self.addr,
                e.display_chain().to_string()
            );
        }
        let _ = self.stream.shutdown(Shutdown::Both);
        if child.join().is_err() {
            error!("[{}] receiver panicked", self.addr);
        }
320 321 322 323 324
    }
}

pub enum Message {
    Request(String),
325
    PeriodicUpdate,
326 327 328 329 330 331 332 333 334 335
    Done,
}

pub struct Channel {
    tx: SyncSender<Message>,
    rx: Receiver<Message>,
}

impl Channel {
    pub fn new() -> Channel {
336
        let (tx, rx) = sync_channel(10);
337 338 339 340 341 342 343 344 345
        Channel { tx, rx }
    }

    pub fn sender(&self) -> SyncSender<Message> {
        self.tx.clone()
    }

    pub fn receiver(&self) -> &Receiver<Message> {
        &self.rx
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
346 347 348
    }
}

349 350
pub fn start(addr: &SocketAddr, query: Arc<Query>) -> thread::JoinHandle<()> {
    let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
351
    info!("RPC server running on {}", addr);
352 353
    thread::spawn(move || loop {
        let (stream, addr) = listener.accept().expect("accept failed");
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
354
        info!("[{}] connected peer", addr);
355
        Connection::new(query.clone(), stream, addr).run();
Roman Zeyde's avatar
Roman Zeyde committed
356
        info!("[{}] disconnected peer", addr);
357
    })
Roman Zeyde's avatar
Roman Zeyde committed
358
}