rpc.rs 15.4 KB
Newer Older
1 2
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::network::serialize::{deserialize, serialize};
3
use bitcoin::util::hash::Sha256dHash;
4
use error_chain::ChainedError;
5
use hex;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
6
use serde_json::{from_str, Number, Value};
Roman Zeyde's avatar
Roman Zeyde committed
7
use std::collections::HashMap;
8
use std::io::{BufRead, BufReader, Write};
9
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
10 11
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
12
use std::thread;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
13

14
use query::Query;
15
use util::{HeaderEntry, Timer};
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
16

Roman Zeyde's avatar
Roman Zeyde committed
17
use errors::*;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
18

Roman Zeyde's avatar
Roman Zeyde committed
19
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
Roman Zeyde's avatar
Roman Zeyde committed
20 21
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
    let script_hash = val.chain_err(|| "missing hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
22 23 24 25 26
    let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
    let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex hash")?;
    Ok(script_hash)
}

27 28 29 30 31 32
fn usize_from_value(val: Option<&Value>, name: &str) -> Result<usize> {
    let val = val.chain_err(|| format!("missing {}", name))?;
    let val = val.as_u64().chain_err(|| format!("non-integer {}", name))?;
    Ok(val as usize)
}

33 34
fn jsonify_header(entry: &HeaderEntry) -> Value {
    let header = entry.header();
Roman Zeyde's avatar
Roman Zeyde committed
35
    json!({
36
        "block_height": entry.height(),
Roman Zeyde's avatar
Roman Zeyde committed
37 38 39 40 41 42 43 44 45
        "version": header.version,
        "prev_block_hash": header.prev_blockhash.be_hex_string(),
        "merkle_root": header.merkle_root.be_hex_string(),
        "timestamp": header.time,
        "bits": header.bits,
        "nonce": header.nonce
    })
}

46 47
struct Connection {
    query: Arc<Query>,
Roman Zeyde's avatar
Roman Zeyde committed
48
    last_header_entry: Option<HeaderEntry>,
49
    status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
50 51
    stream: TcpStream,
    addr: SocketAddr,
52
    chan: Channel,
53 54
}

55 56
impl Connection {
    pub fn new(query: Arc<Query>, stream: TcpStream, addr: SocketAddr) -> Connection {
57
        Connection {
58
            query: query,
Roman Zeyde's avatar
Roman Zeyde committed
59
            last_header_entry: None, // disable header subscription for now
60
            status_hashes: HashMap::new(),
61 62
            stream,
            addr,
63
            chan: Channel::new(),
64 65 66 67
        }
    }

    fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
68
        let entry = self.query.get_best_header()?;
Roman Zeyde's avatar
Roman Zeyde committed
69 70 71 72
        let hex_header = hex::encode(serialize(entry.header()).unwrap());
        let result = json!({"hex": hex_header, "height": entry.height()});
        self.last_header_entry = Some(entry);
        Ok(result)
73
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
74

75
    fn server_version(&self) -> Result<Value> {
76
        Ok(json!(["RustElectrum 0.1.0", "1.2"]))
77
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
78

79
    fn server_banner(&self) -> Result<Value> {
80
        Ok(json!("Welcome to RustElectrum Server!\n"))
81
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
82

83
    fn server_donation_address(&self) -> Result<Value> {
84
        Ok(Value::Null)
85
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
86

87 88 89
    fn server_peers_subscribe(&self) -> Result<Value> {
        Ok(json!([]))
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
90

91
    fn mempool_get_fee_histogram(&self) -> Result<Value> {
92
        Ok(json!(self.query.get_fee_histogram()))
93
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
94

95 96 97 98
    fn blockchain_block_headers(&self, params: &[Value]) -> Result<Value> {
        let start_height = usize_from_value(params.get(0), "start_height")?;
        let count = usize_from_value(params.get(1), "count")?;
        let heights: Vec<usize> = (start_height..(start_height + count)).collect();
Roman Zeyde's avatar
Roman Zeyde committed
99 100 101
        let headers: Vec<String> = self.query
            .get_headers(&heights)
            .into_iter()
102
            .map(|entry| hex::encode(&serialize(entry.header()).unwrap()))
Roman Zeyde's avatar
Roman Zeyde committed
103
            .collect();
104 105 106 107 108
        Ok(json!({
            "count": headers.len(),
            "hex": headers.join(""),
            "max": 2016,
        }))
109 110
    }

Roman Zeyde's avatar
Roman Zeyde committed
111
    fn blockchain_block_get_header(&self, params: &[Value]) -> Result<Value> {
112
        let height = usize_from_value(params.get(0), "missing height")?;
113 114 115 116 117 118
        let mut entries = self.query.get_headers(&[height]);
        let entry = entries
            .pop()
            .chain_err(|| format!("missing header #{}", height))?;
        assert_eq!(entries.len(), 0);
        Ok(json!(jsonify_header(&entry)))
Roman Zeyde's avatar
Roman Zeyde committed
119 120
    }

121
    fn blockchain_estimatefee(&self, params: &[Value]) -> Result<Value> {
122 123
        let blocks_count = usize_from_value(params.get(0), "blocks_count")?;
        let fee_rate = self.query.estimate_fee(blocks_count); // in BTC/kB
124
        Ok(json!(fee_rate))
125
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
126

Roman Zeyde's avatar
Roman Zeyde committed
127
    fn blockchain_relayfee(&self) -> Result<Value> {
128
        Ok(json!(0.0)) // allow sending transactions with any fee.
Roman Zeyde's avatar
Roman Zeyde committed
129 130
    }

131
    fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
132
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
133
        let status = self.query.status(&script_hash[..])?;
134
        let result = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
135 136
        self.status_hashes.insert(script_hash, result.clone());
        Ok(result)
137
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
138

139
    fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
140
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
141
        let status = self.query.status(&script_hash[..])?;
142 143 144
        Ok(
            json!({ "confirmed": status.confirmed_balance(), "unconfirmed": status.mempool_balance() }),
        )
145
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
146

Roman Zeyde's avatar
Roman Zeyde committed
147
    fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
148
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
149
        let status = self.query.status(&script_hash[..])?;
Roman Zeyde's avatar
Roman Zeyde committed
150
        Ok(json!(Value::Array(
151 152
            status
                .history()
Roman Zeyde's avatar
Roman Zeyde committed
153 154 155 156
                .into_iter()
                .map(|item| json!({"height": item.0, "tx_hash": item.1.be_hex_string()}))
                .collect()
        )))
157
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
158

159 160 161 162 163
    fn blockchain_transaction_broadcast(&self, params: &[Value]) -> Result<Value> {
        let tx = params.get(0).chain_err(|| "missing tx")?;
        let tx = tx.as_str().chain_err(|| "non-string tx")?;
        let tx = hex::decode(&tx).chain_err(|| "non-hex tx")?;
        let tx: Transaction = deserialize(&tx).chain_err(|| "failed to parse tx")?;
164
        let txid = self.query.broadcast(&tx)?;
165 166 167
        Ok(json!(txid.be_hex_string()))
    }

168
    fn blockchain_transaction_get(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
169
        // TODO: handle 'verbose' param
Roman Zeyde's avatar
Roman Zeyde committed
170
        let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
171
        let tx = self.query.load_txn(&tx_hash, /*blockhash=*/ None)?;
172
        Ok(json!(hex::encode(&serialize(&tx).unwrap())))
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
173 174
    }

Roman Zeyde's avatar
Roman Zeyde committed
175 176
    fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result<Value> {
        let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
177
        let height = usize_from_value(params.get(1), "height")?;
Roman Zeyde's avatar
Roman Zeyde committed
178 179 180 181 182 183 184 185 186 187 188
        let (merkle, pos) = self.query
            .get_merkle_proof(&tx_hash, height)
            .chain_err(|| "cannot create merkle proof")?;
        let merkle: Vec<String> = merkle
            .into_iter()
            .map(|txid| txid.be_hex_string())
            .collect();
        Ok(json!({
                "block_height": height,
                "merkle": merkle,
                "pos": pos}))
189
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
190

191
    fn handle_command(&mut self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
192 193 194 195 196 197 198
        let result = match method {
            "blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
            "server.version" => self.server_version(),
            "server.banner" => self.server_banner(),
            "server.donation_address" => self.server_donation_address(),
            "server.peers.subscribe" => self.server_peers_subscribe(),
            "mempool.get_fee_histogram" => self.mempool_get_fee_histogram(),
199
            "blockchain.block.headers" => self.blockchain_block_headers(&params),
Roman Zeyde's avatar
Roman Zeyde committed
200
            "blockchain.block.get_header" => self.blockchain_block_get_header(&params),
201
            "blockchain.estimatefee" => self.blockchain_estimatefee(&params),
Roman Zeyde's avatar
Roman Zeyde committed
202
            "blockchain.relayfee" => self.blockchain_relayfee(),
203 204 205
            "blockchain.scripthash.subscribe" => self.blockchain_scripthash_subscribe(&params),
            "blockchain.scripthash.get_balance" => self.blockchain_scripthash_get_balance(&params),
            "blockchain.scripthash.get_history" => self.blockchain_scripthash_get_history(&params),
206
            "blockchain.transaction.broadcast" => self.blockchain_transaction_broadcast(&params),
207 208 209
            "blockchain.transaction.get" => self.blockchain_transaction_get(&params),
            "blockchain.transaction.get_merkle" => self.blockchain_transaction_get_merkle(&params),
            &_ => bail!("unknown method {} {:?}", method, params),
210
        };
211
        // TODO: return application errors should be sent to the client
212 213 214
        Ok(match result {
            Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
            Err(e) => {
Roman Zeyde's avatar
Roman Zeyde committed
215 216 217 218 219 220 221
                warn!(
                    "rpc #{} {} {:?} failed: {}",
                    id,
                    method,
                    params,
                    e.display_chain()
                );
222 223 224
                json!({"jsonrpc": "2.0", "id": id, "error": format!("{}", e)})
            }
        })
225 226
    }

Roman Zeyde's avatar
Roman Zeyde committed
227
    fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
228
        let mut result = vec![];
229
        let mut timer = Timer::new();
Roman Zeyde's avatar
Roman Zeyde committed
230
        if let Some(ref mut last_entry) = self.last_header_entry {
231
            let entry = self.query.get_best_header()?;
Roman Zeyde's avatar
Roman Zeyde committed
232 233
            if *last_entry != entry {
                *last_entry = entry;
Roman Zeyde's avatar
Roman Zeyde committed
234 235
                let hex_header = hex::encode(serialize(last_entry.header()).unwrap());
                let header = json!({"hex": hex_header, "height": last_entry.height()});
Roman Zeyde's avatar
Roman Zeyde committed
236 237 238 239 240
                result.push(json!({
                    "jsonrpc": "2.0",
                    "method": "blockchain.headers.subscribe",
                    "params": [header]}));
            }
241 242
        }
        for (script_hash, status_hash) in self.status_hashes.iter_mut() {
243
            let status = self.query.status(&script_hash[..])?;
244
            let new_status_hash = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
245 246 247 248 249 250 251 252 253
            if new_status_hash == *status_hash {
                continue;
            }
            result.push(json!({
                "jsonrpc": "2.0",
                "method": "blockchain.scripthash.subscribe",
                "params": [script_hash.be_hex_string(), new_status_hash]}));
            *status_hash = new_status_hash;
        }
254 255 256 257 258 259
        timer.tick("total");
        debug!(
            "updating {} subscriptions {:?}",
            self.status_hashes.len(),
            timer
        );
260 261 262
        Ok(result)
    }

263 264 265 266 267 268 269 270 271
    fn send_values(&mut self, values: &[Value]) -> Result<()> {
        for value in values {
            debug!("[{}] <- {}", self.addr, value);
            let line = value.to_string() + "\n";
            self.stream
                .write_all(line.as_bytes())
                .chain_err(|| format!("failed to send {}", value))?;
        }
        Ok(())
272 273
    }

274
    fn handle_replies(&mut self) -> Result<()> {
Roman Zeyde's avatar
Roman Zeyde committed
275
        loop {
276
            let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
Roman Zeyde's avatar
Roman Zeyde committed
277 278 279
            match msg {
                Message::Request(line) => {
                    let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
280
                    debug!("[{}] -> {}", self.addr, cmd);
Roman Zeyde's avatar
Roman Zeyde committed
281 282 283 284 285 286 287 288
                    let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
                        (
                            Some(&Value::String(ref method)),
                            Some(&Value::Array(ref params)),
                            Some(&Value::Number(ref id)),
                        ) => self.handle_command(method, params, id)?,
                        _ => bail!("invalid command: {}", cmd),
                    };
289
                    self.send_values(&[reply])?
290
                }
291
                Message::PeriodicUpdate => {
292 293 294
                    let values = self.update_subscriptions()
                        .chain_err(|| "failed to update subscriptions")?;
                    self.send_values(&values)?
Roman Zeyde's avatar
Roman Zeyde committed
295 296 297 298
                }
                Message::Done => {
                    debug!("done");
                    break;
299
                }
300
            }
Roman Zeyde's avatar
Roman Zeyde committed
301 302 303 304
        }
        Ok(())
    }

305
    fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) {
Roman Zeyde's avatar
Roman Zeyde committed
306 307 308 309 310 311 312 313 314 315 316 317 318 319
        loop {
            let mut line = String::new();
            reader
                .read_line(&mut line)  // TODO: use .lines() iterator
                .expect("failed to read a request");
            if line.is_empty() {
                tx.send(Message::Done).expect("channel closed");
                break;
            } else {
                tx.send(Message::Request(line)).expect("channel closed");
            }
        }
    }

320
    pub fn run(mut self) {
321
        let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
322
        let tx = self.chan.sender();
323
        let child = thread::spawn(|| Connection::handle_requests(reader, tx));
324
        if let Err(e) = self.handle_replies() {
325 326 327 328 329 330
            error!(
                "[{}] connection handling failed: {}",
                self.addr,
                e.display_chain().to_string()
            );
        }
331
        info!("[{}] shutting down connection", self.addr);
332 333 334 335
        let _ = self.stream.shutdown(Shutdown::Both);
        if child.join().is_err() {
            error!("[{}] receiver panicked", self.addr);
        }
336 337 338 339 340
    }
}

pub enum Message {
    Request(String),
341
    PeriodicUpdate,
342 343 344 345 346 347 348 349 350 351
    Done,
}

pub struct Channel {
    tx: SyncSender<Message>,
    rx: Receiver<Message>,
}

impl Channel {
    pub fn new() -> Channel {
352
        let (tx, rx) = sync_channel(10);
353 354 355 356 357 358 359 360 361
        Channel { tx, rx }
    }

    pub fn sender(&self) -> SyncSender<Message> {
        self.tx.clone()
    }

    pub fn receiver(&self) -> &Receiver<Message> {
        &self.rx
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
362 363 364
    }
}

365 366 367 368 369 370 371 372 373
pub struct RPC {
    notification: Sender<()>,
}

impl RPC {
    fn start_notification_worker(
        receiver: Receiver<()>,
        senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
    ) {
374
        thread::spawn(move || {
375 376 377 378 379 380 381 382 383 384 385
            for _ in receiver.iter() {
                let mut senders = senders.lock().unwrap();
                for sender in senders.split_off(0) {
                    if let Err(TrySendError::Disconnected(_)) =
                        sender.try_send(Message::PeriodicUpdate)
                    {
                        continue;
                    }
                    senders.push(sender);
                }
            }
386
        });
387 388 389
    }

    pub fn start(addr: SocketAddr, query: Arc<Query>) -> RPC {
390
        let (tx, rx) = channel();
391 392
        let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
        info!("RPC server running on {}", addr);
393
        thread::spawn(move || {
394
            let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
395
            RPC::start_notification_worker(rx, senders.clone());
396 397 398 399 400 401 402 403 404 405 406 407 408
            loop {
                let (stream, addr) = listener.accept().expect("accept failed");
                let query = query.clone();
                let senders = senders.clone();
                thread::spawn(move || {
                    info!("[{}] connected peer", addr);
                    let conn = Connection::new(query, stream, addr);
                    senders.lock().unwrap().push(conn.chan.sender());
                    conn.run();
                    info!("[{}] disconnected peer", addr);
                });
            }
        });
409
        RPC { notification: tx }
410 411 412 413 414
    }

    pub fn notify(&self) {
        self.notification.send(()).unwrap();
    }
Roman Zeyde's avatar
Roman Zeyde committed
415
}