rpc.rs 13 KB
Newer Older
kenshin-samourai's avatar
kenshin-samourai committed
1
use bitcoin::consensus::encode::serialize;
Roman Zeyde's avatar
Roman Zeyde committed
2 3
use bitcoin_hashes::hex::{FromHex, ToHex};
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
4
use error_chain::ChainedError;
5
use hex;
6
use serde_json::{from_str, Value};
Roman Zeyde's avatar
Roman Zeyde committed
7
use std::collections::HashMap;
8
use std::io::{BufRead, BufReader, Write};
9
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
10
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
11
use std::sync::{Arc, Mutex};
12
use std::thread;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
13

Roman Zeyde's avatar
Roman Zeyde committed
14 15
use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
kenshin-samourai's avatar
kenshin-samourai committed
16
use crate::query::Query;
Roman Zeyde's avatar
Roman Zeyde committed
17
use crate::util::{spawn_thread, Channel, HeaderEntry, SyncChannel};
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
18

19 20
const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION");
const PROTOCOL_VERSION: &str = "1.4";
21

Roman Zeyde's avatar
Roman Zeyde committed
22
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
Roman Zeyde's avatar
Roman Zeyde committed
23 24
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
    let script_hash = val.chain_err(|| "missing hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
25 26 27 28 29
    let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
    let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex hash")?;
    Ok(script_hash)
}

30 31
struct Connection {
    query: Arc<Query>,
Roman Zeyde's avatar
Roman Zeyde committed
32
    last_header_entry: Option<HeaderEntry>,
33
    status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
34 35
    stream: TcpStream,
    addr: SocketAddr,
36
    chan: SyncChannel<Message>,
37
    stats: Arc<Stats>,
38 39
}

40
impl Connection {
41 42 43 44 45 46
    pub fn new(
        query: Arc<Query>,
        stream: TcpStream,
        addr: SocketAddr,
        stats: Arc<Stats>,
    ) -> Connection {
47
        Connection {
48
            query,
Roman Zeyde's avatar
Roman Zeyde committed
49
            last_header_entry: None, // disable header subscription for now
50
            status_hashes: HashMap::new(),
51 52
            stream,
            addr,
53
            chan: SyncChannel::new(10),
54
            stats,
55 56 57
        }
    }

58
    fn server_version(&self) -> Result<Value> {
59 60 61 62
        Ok(json!([
            format!("electrs {}", ELECTRS_VERSION),
            PROTOCOL_VERSION
        ]))
63
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
64

Roman Zeyde's avatar
Roman Zeyde committed
65
    fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
66
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
67
        let status = self.query.status(&script_hash[..])?;
Roman Zeyde's avatar
Roman Zeyde committed
68
        Ok(json!(Value::Array(
69 70
            status
                .history()
Roman Zeyde's avatar
Roman Zeyde committed
71
                .into_iter()
Roman Zeyde's avatar
Roman Zeyde committed
72
                .map(|item| json!({"height": item.0, "tx_hash": item.1.to_hex()}))
Roman Zeyde's avatar
Roman Zeyde committed
73 74
                .collect()
        )))
75
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
76

77
    fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
78 79
        let timer = self
            .stats
80 81 82
            .latency
            .with_label_values(&[method])
            .start_timer();
83 84
        let result = match method {
            "blockchain.scripthash.get_history" => self.blockchain_scripthash_get_history(&params),
85
            "server.ping" => Ok(Value::Null),
86
            "server.version" => self.server_version(),
87
            &_ => bail!("unknown method {} {:?}", method, params),
88
        };
89
        timer.observe_duration();
90
        // TODO: return application errors should be sent to the client
91 92 93
        Ok(match result {
            Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
            Err(e) => {
Roman Zeyde's avatar
Roman Zeyde committed
94 95 96 97 98 99 100
                warn!(
                    "rpc #{} {} {:?} failed: {}",
                    id,
                    method,
                    params,
                    e.display_chain()
                );
101 102 103
                json!({"jsonrpc": "2.0", "id": id, "error": format!("{}", e)})
            }
        })
104 105
    }

Roman Zeyde's avatar
Roman Zeyde committed
106
    fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
Roman Zeyde's avatar
Roman Zeyde committed
107 108
        let timer = self
            .stats
109 110 111
            .latency
            .with_label_values(&["periodic_update"])
            .start_timer();
112
        let mut result = vec![];
Roman Zeyde's avatar
Roman Zeyde committed
113
        if let Some(ref mut last_entry) = self.last_header_entry {
114
            let entry = self.query.get_best_header()?;
Roman Zeyde's avatar
Roman Zeyde committed
115 116
            if *last_entry != entry {
                *last_entry = entry;
117
                let hex_header = hex::encode(serialize(last_entry.header()));
Roman Zeyde's avatar
Roman Zeyde committed
118
                let header = json!({"hex": hex_header, "height": last_entry.height()});
Roman Zeyde's avatar
Roman Zeyde committed
119 120 121 122 123
                result.push(json!({
                    "jsonrpc": "2.0",
                    "method": "blockchain.headers.subscribe",
                    "params": [header]}));
            }
124 125
        }
        for (script_hash, status_hash) in self.status_hashes.iter_mut() {
126
            let status = self.query.status(&script_hash[..])?;
127
            let new_status_hash = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
128 129 130 131 132 133
            if new_status_hash == *status_hash {
                continue;
            }
            result.push(json!({
                "jsonrpc": "2.0",
                "method": "blockchain.scripthash.subscribe",
Roman Zeyde's avatar
Roman Zeyde committed
134
                "params": [script_hash.to_hex(), new_status_hash]}));
135 136
            *status_hash = new_status_hash;
        }
137 138 139 140
        timer.observe_duration();
        self.stats
            .subscriptions
            .set(self.status_hashes.len() as i64);
141 142 143
        Ok(result)
    }

144 145 146 147 148 149 150 151
    fn send_values(&mut self, values: &[Value]) -> Result<()> {
        for value in values {
            let line = value.to_string() + "\n";
            self.stream
                .write_all(line.as_bytes())
                .chain_err(|| format!("failed to send {}", value))?;
        }
        Ok(())
152 153
    }

154
    fn handle_replies(&mut self) -> Result<()> {
155
        let empty_params = json!([]);
Roman Zeyde's avatar
Roman Zeyde committed
156
        loop {
157
            let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
158
            trace!("RPC {:?}", msg);
Roman Zeyde's avatar
Roman Zeyde committed
159 160 161
            match msg {
                Message::Request(line) => {
                    let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
162 163 164 165 166
                    let reply = match (
                        cmd.get("method"),
                        cmd.get("params").unwrap_or_else(|| &empty_params),
                        cmd.get("id"),
                    ) {
Roman Zeyde's avatar
Roman Zeyde committed
167 168
                        (
                            Some(&Value::String(ref method)),
169
                            &Value::Array(ref params),
170
                            Some(ref id),
Roman Zeyde's avatar
Roman Zeyde committed
171 172 173
                        ) => self.handle_command(method, params, id)?,
                        _ => bail!("invalid command: {}", cmd),
                    };
174
                    self.send_values(&[reply])?
175
                }
176
                Message::PeriodicUpdate => {
Roman Zeyde's avatar
Roman Zeyde committed
177 178
                    let values = self
                        .update_subscriptions()
179 180
                        .chain_err(|| "failed to update subscriptions")?;
                    self.send_values(&values)?
Roman Zeyde's avatar
Roman Zeyde committed
181
                }
182
                Message::Done => return Ok(()),
183
            }
Roman Zeyde's avatar
Roman Zeyde committed
184 185 186
        }
    }

187
    fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
Roman Zeyde's avatar
Roman Zeyde committed
188
        loop {
189
            let mut line = Vec::<u8>::new();
Roman Zeyde's avatar
Roman Zeyde committed
190
            reader
191 192
                .read_until(b'\n', &mut line)
                .chain_err(|| "failed to read a request")?;
Roman Zeyde's avatar
Roman Zeyde committed
193
            if line.is_empty() {
194 195
                tx.send(Message::Done).chain_err(|| "channel closed")?;
                return Ok(());
Roman Zeyde's avatar
Roman Zeyde committed
196
            } else {
197 198 199 200 201
                if line.starts_with(&[22, 3, 1]) {
                    // (very) naive SSL handshake detection
                    let _ = tx.send(Message::Done);
                    bail!("invalid request - maybe SSL-encrypted data?: {:?}", line)
                }
202
                match String::from_utf8(line) {
Roman Zeyde's avatar
Roman Zeyde committed
203 204
                    Ok(req) => tx
                        .send(Message::Request(req))
205 206 207
                        .chain_err(|| "channel closed")?,
                    Err(err) => {
                        let _ = tx.send(Message::Done);
208
                        bail!("invalid UTF8: {}", err)
209 210
                    }
                }
Roman Zeyde's avatar
Roman Zeyde committed
211 212 213 214
            }
        }
    }

215
    pub fn run(mut self) {
216
        let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
217
        let tx = self.chan.sender();
218
        let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
219
        if let Err(e) = self.handle_replies() {
220 221 222 223 224 225
            error!(
                "[{}] connection handling failed: {}",
                self.addr,
                e.display_chain().to_string()
            );
        }
226
        debug!("[{}] shutting down connection", self.addr);
227
        let _ = self.stream.shutdown(Shutdown::Both);
228 229
        if let Err(err) = child.join().expect("receiver panicked") {
            error!("[{}] receiver failed: {}", self.addr, err);
230
        }
231 232 233
    }
}

234
#[derive(Debug)]
235 236
pub enum Message {
    Request(String),
237
    PeriodicUpdate,
238 239 240
    Done,
}

241 242 243 244 245
pub enum Notification {
    Periodic,
    Exit,
}

246
pub struct RPC {
247
    notification: Sender<Notification>,
Roman Zeyde's avatar
Roman Zeyde committed
248
    server: Option<thread::JoinHandle<()>>, // so we can join the server while dropping this ojbect
249 250
}

251 252 253 254 255
struct Stats {
    latency: HistogramVec,
    subscriptions: Gauge,
}

256
impl RPC {
257 258
    fn start_notifier(
        notification: Channel<Notification>,
259
        senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
260
        acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
261
    ) {
262
        spawn_thread("notification", move || {
263
            for msg in notification.receiver().iter() {
264
                let mut senders = senders.lock().unwrap();
265
                match msg {
Roman Zeyde's avatar
Roman Zeyde committed
266 267 268 269 270 271 272 273
                    Notification::Periodic => {
                        for sender in senders.split_off(0) {
                            if let Err(TrySendError::Disconnected(_)) =
                                sender.try_send(Message::PeriodicUpdate)
                            {
                                continue;
                            }
                            senders.push(sender);
274
                        }
Roman Zeyde's avatar
Roman Zeyde committed
275
                    }
276
                    Notification::Exit => acceptor.send(None).unwrap(),
277 278
                }
            }
279
        });
280 281
    }

282
    fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
Roman Zeyde's avatar
Roman Zeyde committed
283
        let chan = Channel::unbounded();
284
        let acceptor = chan.sender();
285
        spawn_thread("acceptor", move || {
Roman Zeyde's avatar
Roman Zeyde committed
286 287
            let listener =
                TcpListener::bind(addr).unwrap_or_else(|e| panic!("bind({}) failed: {}", addr, e));
288 289 290 291
            info!(
                "Electrum RPC server running on {} (protocol {})",
                addr, PROTOCOL_VERSION
            );
292 293
            loop {
                let (stream, addr) = listener.accept().expect("accept failed");
Roman Zeyde's avatar
Roman Zeyde committed
294 295 296
                stream
                    .set_nonblocking(false)
                    .expect("failed to set connection as blocking");
297
                acceptor.send(Some((stream, addr))).expect("send failed");
298 299 300 301 302
            }
        });
        chan
    }

303 304
    pub fn start(addr: SocketAddr, query: Arc<Query>, metrics: &Metrics) -> RPC {
        let stats = Arc::new(Stats {
305
            latency: metrics.histogram_vec(
306
                HistogramOpts::new("electrs_electrum_rpc", "Electrum RPC latency (seconds)"),
307 308 309
                &["method"],
            ),
            subscriptions: metrics.gauge(MetricOpts::new(
310
                "electrs_electrum_subscriptions",
311 312 313
                "# of Electrum subscriptions",
            )),
        });
Roman Zeyde's avatar
Roman Zeyde committed
314
        let notification = Channel::unbounded();
Roman Zeyde's avatar
Roman Zeyde committed
315
        RPC {
316
            notification: notification.sender(),
Roman Zeyde's avatar
Roman Zeyde committed
317
            server: Some(spawn_thread("rpc", move || {
318 319 320
                let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
                let acceptor = RPC::start_acceptor(addr);
                RPC::start_notifier(notification, senders.clone(), acceptor.sender());
321
                let mut children = vec![];
322 323 324 325
                while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
                    let query = query.clone();
                    let senders = senders.clone();
                    let stats = stats.clone();
326
                    children.push(spawn_thread("peer", move || {
327 328 329 330 331
                        info!("[{}] connected peer", addr);
                        let conn = Connection::new(query, stream, addr, stats);
                        senders.lock().unwrap().push(conn.chan.sender());
                        conn.run();
                        info!("[{}] disconnected peer", addr);
332 333
                    }));
                }
334
                trace!("closing {} RPC connections", senders.lock().unwrap().len());
335 336 337
                for sender in senders.lock().unwrap().iter() {
                    let _ = sender.send(Message::Done);
                }
338
                trace!("waiting for {} RPC handling threads", children.len());
339 340
                for child in children {
                    let _ = child.join();
341
                }
Roman Zeyde's avatar
Roman Zeyde committed
342
                trace!("RPC connections are closed");
Roman Zeyde's avatar
Roman Zeyde committed
343
            })),
Roman Zeyde's avatar
Roman Zeyde committed
344
        }
345 346 347
    }

    pub fn notify(&self) {
348 349
        self.notification.send(Notification::Periodic).unwrap();
    }
Roman Zeyde's avatar
Roman Zeyde committed
350
}
351

Roman Zeyde's avatar
Roman Zeyde committed
352 353
impl Drop for RPC {
    fn drop(&mut self) {
Roman Zeyde's avatar
Roman Zeyde committed
354
        trace!("stop accepting new RPCs");
355
        self.notification.send(Notification::Exit).unwrap();
Roman Zeyde's avatar
Roman Zeyde committed
356 357 358
        if let Some(handle) = self.server.take() {
            handle.join().unwrap();
        }
Roman Zeyde's avatar
Roman Zeyde committed
359
        trace!("RPC server is stopped");
360
    }
Roman Zeyde's avatar
Roman Zeyde committed
361
}