rpc.rs 12.8 KB
Newer Older
Roman Zeyde's avatar
Roman Zeyde committed
1 2
use bitcoin::blockdata::block::BlockHeader;
use bitcoin::network::serialize::serialize;
3
use bitcoin::util::hash::Sha256dHash;
4 5 6
use crossbeam;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
7
use hex;
8
use itertools;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
9
use serde_json::{from_str, Number, Value};
Roman Zeyde's avatar
Roman Zeyde committed
10
use std::collections::HashMap;
11
use std::io::{BufRead, BufReader, Write};
12
use std::net::{SocketAddr, TcpListener, TcpStream};
13
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
14

Roman Zeyde's avatar
Roman Zeyde committed
15
use query::{Query, Status};
Roman Zeyde's avatar
Roman Zeyde committed
16
use types::FullHash;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
17 18 19

error_chain!{}

Roman Zeyde's avatar
Roman Zeyde committed
20
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
Roman Zeyde's avatar
Roman Zeyde committed
21 22
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
    let script_hash = val.chain_err(|| "missing hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
23 24 25 26 27
    let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
    let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex hash")?;
    Ok(script_hash)
}

28 29
fn history_from_status(status: &Status) -> Vec<(i32, Sha256dHash)> {
    let mut txns_map = HashMap::<Sha256dHash, i32>::new();
Roman Zeyde's avatar
Roman Zeyde committed
30 31 32 33 34 35
    for f in &status.funding {
        txns_map.insert(f.txn_id, f.height);
    }
    for s in &status.spending {
        txns_map.insert(s.txn_id, s.height);
    }
36
    let mut txns: Vec<(i32, Sha256dHash)> =
Roman Zeyde's avatar
Roman Zeyde committed
37 38 39 40 41
        txns_map.into_iter().map(|item| (item.1, item.0)).collect();
    txns.sort();
    txns
}

42
fn hash_from_status(status: &Status) -> Value {
Roman Zeyde's avatar
Roman Zeyde committed
43 44
    let txns = history_from_status(status);
    if txns.is_empty() {
45
        return Value::Null;
Roman Zeyde's avatar
Roman Zeyde committed
46 47 48 49 50 51 52 53 54
    }

    let mut hash = FullHash::default();
    let mut sha2 = Sha256::new();
    for (height, txn_id) in txns {
        let part = format!("{}:{}:", txn_id.be_hex_string(), height);
        sha2.input(part.as_bytes());
    }
    sha2.result(&mut hash);
55
    Value::String(hex::encode(&hash))
Roman Zeyde's avatar
Roman Zeyde committed
56 57
}

58
fn jsonify_header(header: &BlockHeader, height: usize) -> Value {
Roman Zeyde's avatar
Roman Zeyde committed
59 60 61 62 63 64 65 66 67 68 69
    json!({
        "block_height": height,
        "version": header.version,
        "prev_block_hash": header.prev_blockhash.be_hex_string(),
        "merkle_root": header.merkle_root.be_hex_string(),
        "timestamp": header.time,
        "bits": header.bits,
        "nonce": header.nonce
    })
}

70 71
struct Handler<'a> {
    query: &'a Query<'a>,
72 73
    headers_subscribe: bool,
    status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
74 75
}

76
impl<'a> Handler<'a> {
77 78 79 80 81 82 83 84 85
    pub fn new(query: &'a Query) -> Handler<'a> {
        Handler {
            query: query,
            headers_subscribe: false,
            status_hashes: HashMap::new(),
        }
    }

    fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
86 87 88
        let entry = self.query
            .get_best_header()
            .chain_err(|| "no headers found")?;
89 90
        self.headers_subscribe = true;
        Ok(jsonify_header(entry.header(), entry.height()))
91
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
92

93
    fn server_version(&self) -> Result<Value> {
94
        Ok(json!(["RustElectrum 0.1.0", "1.2"]))
95
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
96

97
    fn server_banner(&self) -> Result<Value> {
98
        Ok(json!("Welcome to RustElectrum Server!\n"))
99
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
100

101
    fn server_donation_address(&self) -> Result<Value> {
102
        Ok(Value::Null)
103
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
104

105 106 107
    fn server_peers_subscribe(&self) -> Result<Value> {
        Ok(json!([]))
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
108

109
    fn mempool_get_fee_histogram(&self) -> Result<Value> {
110
        Ok(json!(self.query.get_fee_histogram()))
111
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
112

113 114 115 116 117 118
    fn blockchain_block_get_chunk(&self, params: &[Value]) -> Result<Value> {
        const CHUNK_SIZE: usize = 2016;
        let index = params.get(0).chain_err(|| "missing index")?;
        let index = index.as_u64().chain_err(|| "non-number index")? as usize;
        let heights: Vec<usize> = (0..CHUNK_SIZE).map(|h| index * CHUNK_SIZE + h).collect();
        let headers = self.query.get_headers(&heights);
Roman Zeyde's avatar
Roman Zeyde committed
119 120 121
        let result = itertools::join(
            headers
                .into_iter()
122
                .map(|x| hex::encode(&serialize(&x).unwrap())),
Roman Zeyde's avatar
Roman Zeyde committed
123 124
            "",
        );
125 126 127
        Ok(json!(result))
    }

Roman Zeyde's avatar
Roman Zeyde committed
128 129 130 131
    fn blockchain_block_get_header(&self, params: &[Value]) -> Result<Value> {
        let height = params.get(0).chain_err(|| "missing height")?;
        let height = height.as_u64().chain_err(|| "non-number height")? as usize;
        let headers = self.query.get_headers(&vec![height]);
132
        Ok(json!(jsonify_header(&headers[0], height)))
Roman Zeyde's avatar
Roman Zeyde committed
133 134
    }

135
    fn blockchain_estimatefee(&self, _params: &[Value]) -> Result<Value> {
136
        Ok(json!(-1)) // see mempool_get_fee_histogram() instead.
137
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
138

Roman Zeyde's avatar
Roman Zeyde committed
139 140 141 142
    fn blockchain_relayfee(&self) -> Result<Value> {
        Ok(json!(1e-5)) // TODO: consult with actual node
    }

143
    fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
144
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
145
        let status = self.query.status(&script_hash[..]);
146 147 148
        let result = hash_from_status(&status);
        self.status_hashes.insert(script_hash, result.clone());
        Ok(result)
149
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
150

151
    fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
152
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
153 154
        let status = self.query.status(&script_hash[..]);
        Ok(json!({ "confirmed": status.balance })) // TODO: "unconfirmed"
155
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
156

Roman Zeyde's avatar
Roman Zeyde committed
157
    fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
158
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
159 160 161 162 163 164 165
        let status = self.query.status(&script_hash[..]);
        Ok(json!(Value::Array(
            history_from_status(&status)
                .into_iter()
                .map(|item| json!({"height": item.0, "tx_hash": item.1.be_hex_string()}))
                .collect()
        )))
166
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
167

168
    fn blockchain_transaction_get(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
169
        // TODO: handle 'verbose' param
Roman Zeyde's avatar
Roman Zeyde committed
170
        let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
171 172
        let tx = self.query.get_tx(&tx_hash);
        Ok(json!(hex::encode(&serialize(&tx).unwrap())))
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
173 174
    }

Roman Zeyde's avatar
Roman Zeyde committed
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
    fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result<Value> {
        let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
        let height = params.get(1).chain_err(|| "missing height")?;
        let height = height.as_u64().chain_err(|| "non-number height")? as usize;
        let (merkle, pos) = self.query
            .get_merkle_proof(&tx_hash, height)
            .chain_err(|| "cannot create merkle proof")?;
        let merkle: Vec<String> = merkle
            .into_iter()
            .map(|txid| txid.be_hex_string())
            .collect();
        Ok(json!({
                "block_height": height,
                "merkle": merkle,
                "pos": pos}))
190
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
191

192
    fn handle_command(&mut self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
193 194 195 196 197 198 199
        let result = match method {
            "blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
            "server.version" => self.server_version(),
            "server.banner" => self.server_banner(),
            "server.donation_address" => self.server_donation_address(),
            "server.peers.subscribe" => self.server_peers_subscribe(),
            "mempool.get_fee_histogram" => self.mempool_get_fee_histogram(),
200
            "blockchain.block.get_chunk" => self.blockchain_block_get_chunk(&params),
Roman Zeyde's avatar
Roman Zeyde committed
201
            "blockchain.block.get_header" => self.blockchain_block_get_header(&params),
202
            "blockchain.estimatefee" => self.blockchain_estimatefee(&params),
Roman Zeyde's avatar
Roman Zeyde committed
203
            "blockchain.relayfee" => self.blockchain_relayfee(),
204 205 206 207 208 209 210
            "blockchain.scripthash.subscribe" => self.blockchain_scripthash_subscribe(&params),
            "blockchain.scripthash.get_balance" => self.blockchain_scripthash_get_balance(&params),
            "blockchain.scripthash.get_history" => self.blockchain_scripthash_get_history(&params),
            "blockchain.transaction.get" => self.blockchain_transaction_get(&params),
            "blockchain.transaction.get_merkle" => self.blockchain_transaction_get_merkle(&params),
            &_ => bail!("unknown method {} {:?}", method, params),
        }?;
Roman Zeyde's avatar
Roman Zeyde committed
211
        Ok(json!({"jsonrpc": "2.0", "id": id, "result": result}))
212 213
    }

Roman Zeyde's avatar
Roman Zeyde committed
214
    fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
        let mut result = vec![];
        if self.headers_subscribe {
            let entry = self.query
                .get_best_header()
                .chain_err(|| "no headers found")?;
            let header = jsonify_header(entry.header(), entry.height());
            result.push(json!({
                "jsonrpc": "2.0",
                "method": "blockchain.headers.subscribe",
                "params": [header]}));
        }
        for (script_hash, status_hash) in self.status_hashes.iter_mut() {
            let status = self.query.status(&script_hash[..]);
            let new_status_hash = hash_from_status(&status);
            if new_status_hash == *status_hash {
                continue;
            }
            result.push(json!({
                "jsonrpc": "2.0",
                "method": "blockchain.scripthash.subscribe",
                "params": [script_hash.be_hex_string(), new_status_hash]}));
            *status_hash = new_status_hash;
        }
        Ok(result)
    }

Roman Zeyde's avatar
Roman Zeyde committed
241 242 243 244 245 246 247 248 249 250 251 252
    fn handle_replies(
        &mut self,
        stream: &mut TcpStream,
        addr: SocketAddr,
        chan: &Channel,
    ) -> Result<()> {
        let rx = chan.receiver();
        loop {
            let msg = rx.recv().chain_err(|| "channel closed")?;
            match msg {
                Message::Request(line) => {
                    let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
253
                    debug!("[{}] -> {}", addr, cmd);
Roman Zeyde's avatar
Roman Zeyde committed
254 255 256 257 258 259 260 261
                    let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
                        (
                            Some(&Value::String(ref method)),
                            Some(&Value::Array(ref params)),
                            Some(&Value::Number(ref id)),
                        ) => self.handle_command(method, params, id)?,
                        _ => bail!("invalid command: {}", cmd),
                    };
262
                    debug!("[{}] <- {}", addr, reply);
Roman Zeyde's avatar
Roman Zeyde committed
263 264 265 266
                    let line = reply.to_string() + "\n";
                    stream
                        .write_all(line.as_bytes())
                        .chain_err(|| "failed to send response")?;
267
                }
Roman Zeyde's avatar
Roman Zeyde committed
268 269 270 271 272 273 274
                Message::Block(blockhash) => {
                    debug!("blockhash found: {}", blockhash);
                    for update in self.update_subscriptions()
                        .chain_err(|| "failed to get updates")?
                    {
                        debug!("update: {}", update);
                        let line = update.to_string() + "\n";
275 276
                        stream
                            .write_all(line.as_bytes())
Roman Zeyde's avatar
Roman Zeyde committed
277
                            .chain_err(|| "failed to send update")?;
278
                    }
Roman Zeyde's avatar
Roman Zeyde committed
279 280 281 282
                }
                Message::Done => {
                    debug!("done");
                    break;
283
                }
284
            }
Roman Zeyde's avatar
Roman Zeyde committed
285 286 287 288
        }
        Ok(())
    }

289
    fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) {
Roman Zeyde's avatar
Roman Zeyde committed
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
        loop {
            let mut line = String::new();
            reader
                .read_line(&mut line)  // TODO: use .lines() iterator
                .expect("failed to read a request");
            if line.is_empty() {
                tx.send(Message::Done).expect("channel closed");
                break;
            } else {
                tx.send(Message::Request(line)).expect("channel closed");
            }
        }
    }

    pub fn run(mut self, mut stream: TcpStream, addr: SocketAddr, chan: &Channel) {
        let reader = BufReader::new(stream.try_clone().expect("failed to clone TcpStream"));
306
        // TODO: figure out graceful shutting down and error logging.
Roman Zeyde's avatar
Roman Zeyde committed
307 308
        crossbeam::scope(|scope| {
            let tx = chan.sender();
309 310
            scope.spawn(|| Handler::handle_requests(reader, tx));
            self.handle_replies(&mut stream, addr, chan).unwrap();
Roman Zeyde's avatar
Roman Zeyde committed
311
        });
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
    }
}

pub enum Message {
    Request(String),
    Block(Sha256dHash),
    Done,
}

pub struct Channel {
    tx: SyncSender<Message>,
    rx: Receiver<Message>,
}

impl Channel {
    pub fn new() -> Channel {
328
        let (tx, rx) = sync_channel(10);
329 330 331 332 333 334 335 336 337
        Channel { tx, rx }
    }

    pub fn sender(&self) -> SyncSender<Message> {
        self.tx.clone()
    }

    pub fn receiver(&self) -> &Receiver<Message> {
        &self.rx
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
338 339 340
    }
}

Roman Zeyde's avatar
Roman Zeyde committed
341
pub fn serve(addr: &str, query: &Query, chan: Channel) {
342 343
    let listener = TcpListener::bind(addr).unwrap();
    info!("RPC server running on {}", addr);
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
344 345 346
    loop {
        let (stream, addr) = listener.accept().unwrap();
        info!("[{}] connected peer", addr);
Roman Zeyde's avatar
Roman Zeyde committed
347 348 349 350
        Handler::new(query).run(stream, addr, &chan);
        info!("[{}] disconnected peer", addr);
    }
}