rpc.rs 10.5 KB
Newer Older
1
use bitcoin::consensus::encode::serialize;
Roman Zeyde's avatar
Roman Zeyde committed
2 3
use bitcoin_hashes::hex::{FromHex, ToHex};
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
4
use error_chain::ChainedError;
5
use serde_json::{from_str, Value};
6
use std::collections::HashMap;
7
use std::io::{BufRead, BufReader, Write};
8
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
9
use std::sync::mpsc::SyncSender;
10
use std::sync::{Arc, Mutex};
11
use std::thread;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
12

Roman Zeyde's avatar
Roman Zeyde committed
13
use crate::errors::*;
kenshin-samourai's avatar
kenshin-samourai committed
14
use crate::query::Query;
15
use crate::util::{spawn_thread, Channel, SyncChannel};
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
16

kenshin-samourai's avatar
kenshin-samourai committed
17
// Indexer version
18
const ADDRINDEXRS_VERSION: &str = env!("CARGO_PKG_VERSION");
kenshin-samourai's avatar
kenshin-samourai committed
19
// Version of the simulated electrum protocol
20
const PROTOCOL_VERSION: &str = "1.4";
21

kenshin-samourai's avatar
kenshin-samourai committed
22 23 24
//
// Get a script hash from a given value
//
Roman Zeyde's avatar
Roman Zeyde committed
25
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
kenshin-samourai's avatar
kenshin-samourai committed
26
    // TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
Roman Zeyde's avatar
Roman Zeyde committed
27
    let script_hash = val.chain_err(|| "missing hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
28 29 30 31 32
    let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
    let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex hash")?;
    Ok(script_hash)
}

kenshin-samourai's avatar
kenshin-samourai committed
33 34 35
//
// Connection with a RPC client
//
36 37
struct Connection {
    query: Arc<Query>,
38 39
    stream: TcpStream,
    addr: SocketAddr,
40
    chan: SyncChannel<Message>,
41 42
}

43
impl Connection {
44 45 46 47 48
    pub fn new(
        query: Arc<Query>,
        stream: TcpStream,
        addr: SocketAddr,
    ) -> Connection {
49
        Connection {
50
            query,
51 52
            stream,
            addr,
53
            chan: SyncChannel::new(10),
54 55 56
        }
    }

57
    fn server_version(&self) -> Result<Value> {
58
        Ok(json!([
59
            format!("addrindexrs {}", ADDRINDEXRS_VERSION),
60 61
            PROTOCOL_VERSION
        ]))
62
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
63

64 65 66 67 68 69 70
    fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
        let entry = self.query.get_best_header()?;
        let hex_header = hex::encode(serialize(entry.header()));
        let result = json!({"hex": hex_header, "height": entry.height()});
        Ok(result)
    }

71 72 73 74 75 76
    fn blockchain_scripthash_get_balance(&self, _params: &[Value]) -> Result<Value> {
        Ok(
            json!({ "confirmed": null, "unconfirmed": null }),
        )
    }

Roman Zeyde's avatar
Roman Zeyde committed
77
    fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
78
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
79
        let status = self.query.status(&script_hash[..])?;
Roman Zeyde's avatar
Roman Zeyde committed
80
        Ok(json!(Value::Array(
81 82
            status
                .history()
Roman Zeyde's avatar
Roman Zeyde committed
83
                .into_iter()
kenshin-samourai's avatar
kenshin-samourai committed
84
                .map(|item| json!({"tx_hash": item.to_hex()}))
Roman Zeyde's avatar
Roman Zeyde committed
85 86
                .collect()
        )))
87
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
88

89
    fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result<Value> {
90
        let result = match method {
91
            "blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
92
            "blockchain.scripthash.get_balance" => self.blockchain_scripthash_get_balance(&params),
93
            "blockchain.scripthash.get_history" => self.blockchain_scripthash_get_history(&params),
94
            "server.ping" => Ok(Value::Null),
95
            "server.version" => self.server_version(),
96
            &_ => bail!("unknown method {} {:?}", method, params),
97
        };
98
        // TODO: return application errors should be sent to the client
99 100 101
        Ok(match result {
            Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
            Err(e) => {
Roman Zeyde's avatar
Roman Zeyde committed
102 103 104 105 106 107 108
                warn!(
                    "rpc #{} {} {:?} failed: {}",
                    id,
                    method,
                    params,
                    e.display_chain()
                );
109 110 111
                json!({"jsonrpc": "2.0", "id": id, "error": format!("{}", e)})
            }
        })
112 113
    }

114 115 116 117 118 119 120 121
    fn send_values(&mut self, values: &[Value]) -> Result<()> {
        for value in values {
            let line = value.to_string() + "\n";
            self.stream
                .write_all(line.as_bytes())
                .chain_err(|| format!("failed to send {}", value))?;
        }
        Ok(())
122 123
    }

124
    fn handle_replies(&mut self) -> Result<()> {
125
        let empty_params = json!([]);
Roman Zeyde's avatar
Roman Zeyde committed
126
        loop {
127
            let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
128
            trace!("RPC {:?}", msg);
Roman Zeyde's avatar
Roman Zeyde committed
129 130 131
            match msg {
                Message::Request(line) => {
                    let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
132 133 134 135 136
                    let reply = match (
                        cmd.get("method"),
                        cmd.get("params").unwrap_or_else(|| &empty_params),
                        cmd.get("id"),
                    ) {
Roman Zeyde's avatar
Roman Zeyde committed
137 138
                        (
                            Some(&Value::String(ref method)),
139
                            &Value::Array(ref params),
140
                            Some(ref id),
Roman Zeyde's avatar
Roman Zeyde committed
141 142 143
                        ) => self.handle_command(method, params, id)?,
                        _ => bail!("invalid command: {}", cmd),
                    };
144
                    self.send_values(&[reply])?
145
                }
146
                Message::Done => return Ok(()),
147
            }
Roman Zeyde's avatar
Roman Zeyde committed
148 149 150
        }
    }

151
    fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
Roman Zeyde's avatar
Roman Zeyde committed
152
        loop {
153
            let mut line = Vec::<u8>::new();
Roman Zeyde's avatar
Roman Zeyde committed
154
            reader
155 156
                .read_until(b'\n', &mut line)
                .chain_err(|| "failed to read a request")?;
Roman Zeyde's avatar
Roman Zeyde committed
157
            if line.is_empty() {
158 159
                tx.send(Message::Done).chain_err(|| "channel closed")?;
                return Ok(());
Roman Zeyde's avatar
Roman Zeyde committed
160
            } else {
161 162 163 164 165
                if line.starts_with(&[22, 3, 1]) {
                    // (very) naive SSL handshake detection
                    let _ = tx.send(Message::Done);
                    bail!("invalid request - maybe SSL-encrypted data?: {:?}", line)
                }
166
                match String::from_utf8(line) {
Roman Zeyde's avatar
Roman Zeyde committed
167 168
                    Ok(req) => tx
                        .send(Message::Request(req))
169 170 171
                        .chain_err(|| "channel closed")?,
                    Err(err) => {
                        let _ = tx.send(Message::Done);
172
                        bail!("invalid UTF8: {}", err)
173 174
                    }
                }
Roman Zeyde's avatar
Roman Zeyde committed
175 176 177 178
            }
        }
    }

179
    pub fn run(mut self) {
180
        let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
181
        let tx = self.chan.sender();
182
        let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
183
        if let Err(e) = self.handle_replies() {
184 185 186 187 188 189
            error!(
                "[{}] connection handling failed: {}",
                self.addr,
                e.display_chain().to_string()
            );
        }
190
        debug!("[{}] shutting down connection", self.addr);
191
        let _ = self.stream.shutdown(Shutdown::Both);
192 193
        if let Err(err) = child.join().expect("receiver panicked") {
            error!("[{}] receiver failed: {}", self.addr, err);
194
        }
195 196 197
    }
}

kenshin-samourai's avatar
kenshin-samourai committed
198 199 200
//
// Messages supported by the RPC API
//
201
#[derive(Debug)]
202 203 204 205 206
pub enum Message {
    Request(String),
    Done,
}

kenshin-samourai's avatar
kenshin-samourai committed
207 208 209
//
// RPC server
//
210
pub struct RPC {
Roman Zeyde's avatar
Roman Zeyde committed
211
    server: Option<thread::JoinHandle<()>>, // so we can join the server while dropping this ojbect
212 213 214
}

impl RPC {
215
    fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
Roman Zeyde's avatar
Roman Zeyde committed
216
        let chan = Channel::unbounded();
217
        let acceptor = chan.sender();
218
        spawn_thread("acceptor", move || {
Roman Zeyde's avatar
Roman Zeyde committed
219 220
            let listener =
                TcpListener::bind(addr).unwrap_or_else(|e| panic!("bind({}) failed: {}", addr, e));
221
            info!(
kenshin-samourai's avatar
kenshin-samourai committed
222
                "Indexer RPC server running on {} (protocol {})",
223 224
                addr, PROTOCOL_VERSION
            );
225 226
            loop {
                let (stream, addr) = listener.accept().expect("accept failed");
Roman Zeyde's avatar
Roman Zeyde committed
227 228 229
                stream
                    .set_nonblocking(false)
                    .expect("failed to set connection as blocking");
230
                acceptor.send(Some((stream, addr))).expect("send failed");
231 232 233 234 235
            }
        });
        chan
    }

236
    pub fn start(addr: SocketAddr, query: Arc<Query>) -> RPC {
Roman Zeyde's avatar
Roman Zeyde committed
237
        RPC {
Roman Zeyde's avatar
Roman Zeyde committed
238
            server: Some(spawn_thread("rpc", move || {
239 240 241 242 243
                let senders = Arc::new(Mutex::new(HashMap::<i32, SyncSender<Message>>::new()));
                let handles = Arc::new(Mutex::new(
                    HashMap::<i32, std::thread::JoinHandle<()>>::new(),
                ));

244
                let acceptor = RPC::start_acceptor(addr);
245
                let mut handle_count = 0;
kenshin-samourai's avatar
kenshin-samourai committed
246

247
                while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
                    let handle_id = handle_count;
                    handle_count += 1;
                    // explicitely scope the shadowed variables for the new thread
                    let handle: thread::JoinHandle<()> = {
                        let query = Arc::clone(&query);
                        let senders = Arc::clone(&senders);
                        let handles = Arc::clone(&handles);

                        spawn_thread("peer", move || {
                            info!("[{}] connected peer #{}", addr, handle_id);
                            let conn = Connection::new(query, stream, addr);
                            senders
                                .lock()
                                .unwrap()
                                .insert(handle_id, conn.chan.sender());
                            conn.run();
                            info!("[{}] disconnected peer #{}", addr, handle_id);
                            senders.lock().unwrap().remove(&handle_id);
                            handles.lock().unwrap().remove(&handle_id);
                        })
                    };

                    handles.lock().unwrap().insert(handle_id, handle);
271
                }
kenshin-samourai's avatar
kenshin-samourai committed
272

273
                trace!("closing {} RPC connections", senders.lock().unwrap().len());
274
                for sender in senders.lock().unwrap().values() {
275 276
                    let _ = sender.send(Message::Done);
                }
kenshin-samourai's avatar
kenshin-samourai committed
277

278 279 280 281 282
                trace!("waiting for {} RPC handling threads", handles.lock().unwrap().len());
                for (_, handle) in handles.lock().unwrap().drain() {
                    if let Err(e) = handle.join() {
                        warn!("failed to join thread: {:?}", e);
                    }
283
                }
kenshin-samourai's avatar
kenshin-samourai committed
284

Roman Zeyde's avatar
Roman Zeyde committed
285
                trace!("RPC connections are closed");
Roman Zeyde's avatar
Roman Zeyde committed
286
            })),
Roman Zeyde's avatar
Roman Zeyde committed
287
        }
288
    }
Roman Zeyde's avatar
Roman Zeyde committed
289
}
290

Roman Zeyde's avatar
Roman Zeyde committed
291 292
impl Drop for RPC {
    fn drop(&mut self) {
Roman Zeyde's avatar
Roman Zeyde committed
293
        trace!("stop accepting new RPCs");
Roman Zeyde's avatar
Roman Zeyde committed
294 295 296
        if let Some(handle) = self.server.take() {
            handle.join().unwrap();
        }
Roman Zeyde's avatar
Roman Zeyde committed
297
        trace!("RPC server is stopped");
298
    }
Roman Zeyde's avatar
Roman Zeyde committed
299
}