rpc.rs 8.64 KB
Newer Older
Roman Zeyde's avatar
Roman Zeyde committed
1 2
use bitcoin_hashes::hex::{FromHex, ToHex};
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
3
use error_chain::ChainedError;
4
use serde_json::{from_str, Value};
5
use std::io::{BufRead, BufReader, Write};
6
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
7
use std::sync::mpsc::SyncSender;
8
use std::sync::{Arc, Mutex};
9
use std::thread;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
10

Roman Zeyde's avatar
Roman Zeyde committed
11
use crate::errors::*;
kenshin-samourai's avatar
kenshin-samourai committed
12
use crate::query::Query;
13
use crate::util::{spawn_thread, Channel, SyncChannel};
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
14

15 16
const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION");
const PROTOCOL_VERSION: &str = "1.4";
17

18

Roman Zeyde's avatar
Roman Zeyde committed
19
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
Roman Zeyde's avatar
Roman Zeyde committed
20 21
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
    let script_hash = val.chain_err(|| "missing hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
22 23 24 25 26
    let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
    let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex hash")?;
    Ok(script_hash)
}

27 28
struct Connection {
    query: Arc<Query>,
29 30
    stream: TcpStream,
    addr: SocketAddr,
31
    chan: SyncChannel<Message>,
32 33
}

34
impl Connection {
35 36 37 38 39
    pub fn new(
        query: Arc<Query>,
        stream: TcpStream,
        addr: SocketAddr,
    ) -> Connection {
40
        Connection {
41
            query,
42 43
            stream,
            addr,
44
            chan: SyncChannel::new(10),
45 46 47
        }
    }

48
    fn server_version(&self) -> Result<Value> {
49 50 51 52
        Ok(json!([
            format!("electrs {}", ELECTRS_VERSION),
            PROTOCOL_VERSION
        ]))
53
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
54

Roman Zeyde's avatar
Roman Zeyde committed
55
    fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
56
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
57
        let status = self.query.status(&script_hash[..])?;
Roman Zeyde's avatar
Roman Zeyde committed
58
        Ok(json!(Value::Array(
59 60
            status
                .history()
Roman Zeyde's avatar
Roman Zeyde committed
61
                .into_iter()
Roman Zeyde's avatar
Roman Zeyde committed
62
                .map(|item| json!({"height": item.0, "tx_hash": item.1.to_hex()}))
Roman Zeyde's avatar
Roman Zeyde committed
63 64
                .collect()
        )))
65
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
66

67
    fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result<Value> {
68 69
        let result = match method {
            "blockchain.scripthash.get_history" => self.blockchain_scripthash_get_history(&params),
70
            "server.ping" => Ok(Value::Null),
71
            "server.version" => self.server_version(),
72
            &_ => bail!("unknown method {} {:?}", method, params),
73
        };
74
        // TODO: return application errors should be sent to the client
75 76 77
        Ok(match result {
            Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
            Err(e) => {
Roman Zeyde's avatar
Roman Zeyde committed
78 79 80 81 82 83 84
                warn!(
                    "rpc #{} {} {:?} failed: {}",
                    id,
                    method,
                    params,
                    e.display_chain()
                );
85 86 87
                json!({"jsonrpc": "2.0", "id": id, "error": format!("{}", e)})
            }
        })
88 89
    }

90 91 92 93 94 95 96 97
    fn send_values(&mut self, values: &[Value]) -> Result<()> {
        for value in values {
            let line = value.to_string() + "\n";
            self.stream
                .write_all(line.as_bytes())
                .chain_err(|| format!("failed to send {}", value))?;
        }
        Ok(())
98 99
    }

100
    fn handle_replies(&mut self) -> Result<()> {
101
        let empty_params = json!([]);
Roman Zeyde's avatar
Roman Zeyde committed
102
        loop {
103
            let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
104
            trace!("RPC {:?}", msg);
Roman Zeyde's avatar
Roman Zeyde committed
105 106 107
            match msg {
                Message::Request(line) => {
                    let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
108 109 110 111 112
                    let reply = match (
                        cmd.get("method"),
                        cmd.get("params").unwrap_or_else(|| &empty_params),
                        cmd.get("id"),
                    ) {
Roman Zeyde's avatar
Roman Zeyde committed
113 114
                        (
                            Some(&Value::String(ref method)),
115
                            &Value::Array(ref params),
116
                            Some(ref id),
Roman Zeyde's avatar
Roman Zeyde committed
117 118 119
                        ) => self.handle_command(method, params, id)?,
                        _ => bail!("invalid command: {}", cmd),
                    };
120
                    self.send_values(&[reply])?
121
                }
122
                Message::Done => return Ok(()),
123
            }
Roman Zeyde's avatar
Roman Zeyde committed
124 125 126
        }
    }

127
    fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
Roman Zeyde's avatar
Roman Zeyde committed
128
        loop {
129
            let mut line = Vec::<u8>::new();
Roman Zeyde's avatar
Roman Zeyde committed
130
            reader
131 132
                .read_until(b'\n', &mut line)
                .chain_err(|| "failed to read a request")?;
Roman Zeyde's avatar
Roman Zeyde committed
133
            if line.is_empty() {
134 135
                tx.send(Message::Done).chain_err(|| "channel closed")?;
                return Ok(());
Roman Zeyde's avatar
Roman Zeyde committed
136
            } else {
137 138 139 140 141
                if line.starts_with(&[22, 3, 1]) {
                    // (very) naive SSL handshake detection
                    let _ = tx.send(Message::Done);
                    bail!("invalid request - maybe SSL-encrypted data?: {:?}", line)
                }
142
                match String::from_utf8(line) {
Roman Zeyde's avatar
Roman Zeyde committed
143 144
                    Ok(req) => tx
                        .send(Message::Request(req))
145 146 147
                        .chain_err(|| "channel closed")?,
                    Err(err) => {
                        let _ = tx.send(Message::Done);
148
                        bail!("invalid UTF8: {}", err)
149 150
                    }
                }
Roman Zeyde's avatar
Roman Zeyde committed
151 152 153 154
            }
        }
    }

155
    pub fn run(mut self) {
156
        let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
157
        let tx = self.chan.sender();
158
        let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
159
        if let Err(e) = self.handle_replies() {
160 161 162 163 164 165
            error!(
                "[{}] connection handling failed: {}",
                self.addr,
                e.display_chain().to_string()
            );
        }
166
        debug!("[{}] shutting down connection", self.addr);
167
        let _ = self.stream.shutdown(Shutdown::Both);
168 169
        if let Err(err) = child.join().expect("receiver panicked") {
            error!("[{}] receiver failed: {}", self.addr, err);
170
        }
171 172 173
    }
}

174
#[derive(Debug)]
175 176 177 178 179
pub enum Message {
    Request(String),
    Done,
}

180
pub struct RPC {
Roman Zeyde's avatar
Roman Zeyde committed
181
    server: Option<thread::JoinHandle<()>>, // so we can join the server while dropping this ojbect
182 183 184
}

impl RPC {
185
    fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
Roman Zeyde's avatar
Roman Zeyde committed
186
        let chan = Channel::unbounded();
187
        let acceptor = chan.sender();
188
        spawn_thread("acceptor", move || {
Roman Zeyde's avatar
Roman Zeyde committed
189 190
            let listener =
                TcpListener::bind(addr).unwrap_or_else(|e| panic!("bind({}) failed: {}", addr, e));
191 192 193 194
            info!(
                "Electrum RPC server running on {} (protocol {})",
                addr, PROTOCOL_VERSION
            );
195 196
            loop {
                let (stream, addr) = listener.accept().expect("accept failed");
Roman Zeyde's avatar
Roman Zeyde committed
197 198 199
                stream
                    .set_nonblocking(false)
                    .expect("failed to set connection as blocking");
200
                acceptor.send(Some((stream, addr))).expect("send failed");
201 202 203 204 205
            }
        });
        chan
    }

206
    pub fn start(addr: SocketAddr, query: Arc<Query>) -> RPC {
Roman Zeyde's avatar
Roman Zeyde committed
207
        RPC {
Roman Zeyde's avatar
Roman Zeyde committed
208
            server: Some(spawn_thread("rpc", move || {
209 210
                let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
                let acceptor = RPC::start_acceptor(addr);
211
                let mut children = vec![];
212 213 214
                while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
                    let query = query.clone();
                    let senders = senders.clone();
215
                    children.push(spawn_thread("peer", move || {
216
                        info!("[{}] connected peer", addr);
217
                        let conn = Connection::new(query, stream, addr);
218 219 220
                        senders.lock().unwrap().push(conn.chan.sender());
                        conn.run();
                        info!("[{}] disconnected peer", addr);
221 222
                    }));
                }
223
                trace!("closing {} RPC connections", senders.lock().unwrap().len());
224 225 226
                for sender in senders.lock().unwrap().iter() {
                    let _ = sender.send(Message::Done);
                }
227
                trace!("waiting for {} RPC handling threads", children.len());
228 229
                for child in children {
                    let _ = child.join();
230
                }
Roman Zeyde's avatar
Roman Zeyde committed
231
                trace!("RPC connections are closed");
Roman Zeyde's avatar
Roman Zeyde committed
232
            })),
Roman Zeyde's avatar
Roman Zeyde committed
233
        }
234
    }
Roman Zeyde's avatar
Roman Zeyde committed
235
}
236

Roman Zeyde's avatar
Roman Zeyde committed
237 238
impl Drop for RPC {
    fn drop(&mut self) {
Roman Zeyde's avatar
Roman Zeyde committed
239
        trace!("stop accepting new RPCs");
Roman Zeyde's avatar
Roman Zeyde committed
240 241 242
        if let Some(handle) = self.server.take() {
            handle.join().unwrap();
        }
Roman Zeyde's avatar
Roman Zeyde committed
243
        trace!("RPC server is stopped");
244
    }
Roman Zeyde's avatar
Roman Zeyde committed
245
}