rpc.rs 21.8 KB
Newer Older
1
use bitcoin::blockdata::transaction::Transaction;
2
use bitcoin::consensus::encode::{deserialize, serialize};
Roman Zeyde's avatar
Roman Zeyde committed
3 4
use bitcoin_hashes::hex::{FromHex, ToHex};
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
5
use error_chain::ChainedError;
6
use hex;
7
use serde_json::{from_str, Value};
Roman Zeyde's avatar
Roman Zeyde committed
8
use std::collections::HashMap;
9
use std::io::{BufRead, BufReader, Write};
10
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
11
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
12
use std::sync::{Arc, Mutex};
13
use std::thread;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
14

Roman Zeyde's avatar
Roman Zeyde committed
15 16 17 18
use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use crate::query::{Query, Status};
use crate::util::{spawn_thread, Channel, HeaderEntry, SyncChannel};
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
19

20 21
const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION");
const PROTOCOL_VERSION: &str = "1.4";
22

Roman Zeyde's avatar
Roman Zeyde committed
23
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
Roman Zeyde's avatar
Roman Zeyde committed
24 25
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
    let script_hash = val.chain_err(|| "missing hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
26 27 28 29 30
    let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
    let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex hash")?;
    Ok(script_hash)
}

31 32 33 34 35 36
fn usize_from_value(val: Option<&Value>, name: &str) -> Result<usize> {
    let val = val.chain_err(|| format!("missing {}", name))?;
    let val = val.as_u64().chain_err(|| format!("non-integer {}", name))?;
    Ok(val as usize)
}

37 38 39 40 41 42 43
fn usize_from_value_or(val: Option<&Value>, name: &str, default: usize) -> Result<usize> {
    if val.is_none() {
        return Ok(default);
    }
    usize_from_value(val, name)
}

44 45 46
fn bool_from_value(val: Option<&Value>, name: &str) -> Result<bool> {
    let val = val.chain_err(|| format!("missing {}", name))?;
    let val = val.as_bool().chain_err(|| format!("not a bool {}", name))?;
47 48 49 50 51 52 53 54
    Ok(val)
}

fn bool_from_value_or(val: Option<&Value>, name: &str, default: bool) -> Result<bool> {
    if val.is_none() {
        return Ok(default);
    }
    bool_from_value(val, name)
55 56
}

57 58 59 60 61
fn unspent_from_status(status: &Status) -> Value {
    json!(Value::Array(
        status
            .unspent()
            .into_iter()
Roman Zeyde's avatar
Roman Zeyde committed
62
            .map(|out| json!({
63 64
                "height": out.height,
                "tx_pos": out.output_index,
Roman Zeyde's avatar
Roman Zeyde committed
65
                "tx_hash": out.txn_id.to_hex(),
66
                "value": out.value,
Roman Zeyde's avatar
Roman Zeyde committed
67 68
            }))
            .collect()
69 70 71
    ))
}

72 73
struct Connection {
    query: Arc<Query>,
Roman Zeyde's avatar
Roman Zeyde committed
74
    last_header_entry: Option<HeaderEntry>,
75
    status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
76 77
    stream: TcpStream,
    addr: SocketAddr,
78
    chan: SyncChannel<Message>,
79
    stats: Arc<Stats>,
80 81
}

82
impl Connection {
83 84 85 86 87 88
    pub fn new(
        query: Arc<Query>,
        stream: TcpStream,
        addr: SocketAddr,
        stats: Arc<Stats>,
    ) -> Connection {
89
        Connection {
90
            query,
Roman Zeyde's avatar
Roman Zeyde committed
91
            last_header_entry: None, // disable header subscription for now
92
            status_hashes: HashMap::new(),
93 94
            stream,
            addr,
95
            chan: SyncChannel::new(10),
96
            stats,
97 98 99 100
        }
    }

    fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
101
        let entry = self.query.get_best_header()?;
102
        let hex_header = hex::encode(serialize(entry.header()));
Roman Zeyde's avatar
Roman Zeyde committed
103 104 105
        let result = json!({"hex": hex_header, "height": entry.height()});
        self.last_header_entry = Some(entry);
        Ok(result)
106
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
107

108
    fn server_version(&self) -> Result<Value> {
109 110 111 112
        Ok(json!([
            format!("electrs {}", ELECTRS_VERSION),
            PROTOCOL_VERSION
        ]))
113
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
114

115
    fn server_banner(&self) -> Result<Value> {
116
        Ok(json!(self.query.get_banner()?))
117
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
118

119
    fn server_donation_address(&self) -> Result<Value> {
120
        Ok(Value::Null)
121
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
122

123 124 125
    fn server_peers_subscribe(&self) -> Result<Value> {
        Ok(json!([]))
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
126

127
    fn mempool_get_fee_histogram(&self) -> Result<Value> {
128
        Ok(json!(self.query.get_fee_histogram()))
129
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
130

131 132
    fn blockchain_block_header(&self, params: &[Value]) -> Result<Value> {
        let height = usize_from_value(params.get(0), "height")?;
133
        let cp_height = usize_from_value_or(params.get(1), "cp_height", 0)?;
134 135

        let raw_header_hex: String = self
136 137 138 139 140
            .query
            .get_headers(&[height])
            .into_iter()
            .map(|entry| hex::encode(&serialize(entry.header())))
            .collect();
141 142 143 144 145 146

        if cp_height == 0 {
            return Ok(json!(raw_header_hex));
        }
        let (branch, root) = self.query.get_header_merkle_proof(height, cp_height)?;

Roman Zeyde's avatar
Roman Zeyde committed
147
        let branch_vec: Vec<String> = branch.into_iter().map(|b| b.to_hex()).collect();
148

Roman Zeyde's avatar
Roman Zeyde committed
149
        Ok(json!({
150
            "header": raw_header_hex,
Roman Zeyde's avatar
Roman Zeyde committed
151
            "root": root.to_hex(),
152
            "branch": branch_vec
Roman Zeyde's avatar
Roman Zeyde committed
153
        }))
154 155
    }

156 157 158
    fn blockchain_block_headers(&self, params: &[Value]) -> Result<Value> {
        let start_height = usize_from_value(params.get(0), "start_height")?;
        let count = usize_from_value(params.get(1), "count")?;
159
        let cp_height = usize_from_value_or(params.get(2), "cp_height", 0)?;
160
        let heights: Vec<usize> = (start_height..(start_height + count)).collect();
Roman Zeyde's avatar
Roman Zeyde committed
161 162
        let headers: Vec<String> = self
            .query
Roman Zeyde's avatar
Roman Zeyde committed
163 164
            .get_headers(&heights)
            .into_iter()
165
            .map(|entry| hex::encode(&serialize(entry.header())))
Roman Zeyde's avatar
Roman Zeyde committed
166
            .collect();
167 168 169 170 171 172 173 174 175 176 177 178 179

        if count == 0 || cp_height == 0 {
            return Ok(json!({
                "count": headers.len(),
                "hex": headers.join(""),
                "max": 2016,
            }));
        }

        let (branch, root) = self
            .query
            .get_header_merkle_proof(start_height + (count - 1), cp_height)?;

Roman Zeyde's avatar
Roman Zeyde committed
180
        let branch_vec: Vec<String> = branch.into_iter().map(|b| b.to_hex()).collect();
181

182 183 184 185
        Ok(json!({
            "count": headers.len(),
            "hex": headers.join(""),
            "max": 2016,
Roman Zeyde's avatar
Roman Zeyde committed
186
            "root": root.to_hex(),
187
            "branch" : branch_vec
188
        }))
189 190
    }

191
    fn blockchain_estimatefee(&self, params: &[Value]) -> Result<Value> {
192 193
        let blocks_count = usize_from_value(params.get(0), "blocks_count")?;
        let fee_rate = self.query.estimate_fee(blocks_count); // in BTC/kB
194
        Ok(json!(fee_rate))
195
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
196

Roman Zeyde's avatar
Roman Zeyde committed
197
    fn blockchain_relayfee(&self) -> Result<Value> {
198
        Ok(json!(0.0)) // allow sending transactions with any fee.
Roman Zeyde's avatar
Roman Zeyde committed
199 200
    }

201
    fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
202
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
203
        let status = self.query.status(&script_hash[..])?;
204
        let result = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
205 206
        self.status_hashes.insert(script_hash, result.clone());
        Ok(result)
207
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
208

209
    fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
210
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
211
        let status = self.query.status(&script_hash[..])?;
212 213 214
        Ok(
            json!({ "confirmed": status.confirmed_balance(), "unconfirmed": status.mempool_balance() }),
        )
215
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
216

Roman Zeyde's avatar
Roman Zeyde committed
217
    fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
218
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
219
        let status = self.query.status(&script_hash[..])?;
Roman Zeyde's avatar
Roman Zeyde committed
220
        Ok(json!(Value::Array(
221 222
            status
                .history()
Roman Zeyde's avatar
Roman Zeyde committed
223
                .into_iter()
Roman Zeyde's avatar
Roman Zeyde committed
224
                .map(|item| json!({"height": item.0, "tx_hash": item.1.to_hex()}))
Roman Zeyde's avatar
Roman Zeyde committed
225 226
                .collect()
        )))
227
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
228

229 230
    fn blockchain_scripthash_listunspent(&self, params: &[Value]) -> Result<Value> {
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
231 232 233
        Ok(unspent_from_status(&self.query.status(&script_hash[..])?))
    }

234 235 236 237 238
    fn blockchain_transaction_broadcast(&self, params: &[Value]) -> Result<Value> {
        let tx = params.get(0).chain_err(|| "missing tx")?;
        let tx = tx.as_str().chain_err(|| "non-string tx")?;
        let tx = hex::decode(&tx).chain_err(|| "non-hex tx")?;
        let tx: Transaction = deserialize(&tx).chain_err(|| "failed to parse tx")?;
239
        let txid = self.query.broadcast(&tx)?;
240 241 242 243
        self.query.update_mempool()?;
        if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) {
            warn!("failed to issue PeriodicUpdate after broadcast: {}", e);
        }
Roman Zeyde's avatar
Roman Zeyde committed
244
        Ok(json!(txid.to_hex()))
245 246
    }

247
    fn blockchain_transaction_get(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
248
        let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
249 250 251 252 253
        let verbose = match params.get(1) {
            Some(value) => value.as_bool().chain_err(|| "non-bool verbose value")?,
            None => false,
        };
        Ok(self.query.get_transaction(&tx_hash, verbose)?)
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
254 255
    }

Roman Zeyde's avatar
Roman Zeyde committed
256 257
    fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result<Value> {
        let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
258
        let height = usize_from_value(params.get(1), "height")?;
Roman Zeyde's avatar
Roman Zeyde committed
259 260
        let (merkle, pos) = self
            .query
Roman Zeyde's avatar
Roman Zeyde committed
261 262
            .get_merkle_proof(&tx_hash, height)
            .chain_err(|| "cannot create merkle proof")?;
Roman Zeyde's avatar
Roman Zeyde committed
263
        let merkle: Vec<String> = merkle.into_iter().map(|txid| txid.to_hex()).collect();
Roman Zeyde's avatar
Roman Zeyde committed
264 265 266 267
        Ok(json!({
                "block_height": height,
                "merkle": merkle,
                "pos": pos}))
268
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
269

270 271 272
    fn blockchain_transaction_id_from_pos(&self, params: &[Value]) -> Result<Value> {
        let height = usize_from_value(params.get(0), "height")?;
        let tx_pos = usize_from_value(params.get(1), "tx_pos")?;
273
        let want_merkle = bool_from_value_or(params.get(2), "merkle", false)?;
274 275 276 277

        let (txid, merkle) = self.query.get_id_from_pos(height, tx_pos, want_merkle)?;

        if !want_merkle {
Roman Zeyde's avatar
Roman Zeyde committed
278
            return Ok(json!(txid.to_hex()));
279 280
        }

Roman Zeyde's avatar
Roman Zeyde committed
281
        let merkle_vec: Vec<String> = merkle.into_iter().map(|entry| entry.to_hex()).collect();
282 283

        Ok(json!({
Roman Zeyde's avatar
Roman Zeyde committed
284
            "tx_hash" : txid.to_hex(),
285 286 287
            "merkle" : merkle_vec}))
    }

288
    fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
289 290
        let timer = self
            .stats
291 292 293
            .latency
            .with_label_values(&[method])
            .start_timer();
294
        let result = match method {
295
            "blockchain.block.header" => self.blockchain_block_header(&params),
296
            "blockchain.block.headers" => self.blockchain_block_headers(&params),
297
            "blockchain.estimatefee" => self.blockchain_estimatefee(&params),
298
            "blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
Roman Zeyde's avatar
Roman Zeyde committed
299
            "blockchain.relayfee" => self.blockchain_relayfee(),
300 301
            "blockchain.scripthash.get_balance" => self.blockchain_scripthash_get_balance(&params),
            "blockchain.scripthash.get_history" => self.blockchain_scripthash_get_history(&params),
302
            "blockchain.scripthash.listunspent" => self.blockchain_scripthash_listunspent(&params),
303
            "blockchain.scripthash.subscribe" => self.blockchain_scripthash_subscribe(&params),
304
            "blockchain.transaction.broadcast" => self.blockchain_transaction_broadcast(&params),
305 306
            "blockchain.transaction.get" => self.blockchain_transaction_get(&params),
            "blockchain.transaction.get_merkle" => self.blockchain_transaction_get_merkle(&params),
307 308 309
            "blockchain.transaction.id_from_pos" => {
                self.blockchain_transaction_id_from_pos(&params)
            }
310 311 312 313
            "mempool.get_fee_histogram" => self.mempool_get_fee_histogram(),
            "server.banner" => self.server_banner(),
            "server.donation_address" => self.server_donation_address(),
            "server.peers.subscribe" => self.server_peers_subscribe(),
314
            "server.ping" => Ok(Value::Null),
315
            "server.version" => self.server_version(),
316
            &_ => bail!("unknown method {} {:?}", method, params),
317
        };
318
        timer.observe_duration();
319
        // TODO: return application errors should be sent to the client
320 321 322
        Ok(match result {
            Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
            Err(e) => {
Roman Zeyde's avatar
Roman Zeyde committed
323 324 325 326 327 328 329
                warn!(
                    "rpc #{} {} {:?} failed: {}",
                    id,
                    method,
                    params,
                    e.display_chain()
                );
330 331 332
                json!({"jsonrpc": "2.0", "id": id, "error": format!("{}", e)})
            }
        })
333 334
    }

Roman Zeyde's avatar
Roman Zeyde committed
335
    fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
Roman Zeyde's avatar
Roman Zeyde committed
336 337
        let timer = self
            .stats
338 339 340
            .latency
            .with_label_values(&["periodic_update"])
            .start_timer();
341
        let mut result = vec![];
Roman Zeyde's avatar
Roman Zeyde committed
342
        if let Some(ref mut last_entry) = self.last_header_entry {
343
            let entry = self.query.get_best_header()?;
Roman Zeyde's avatar
Roman Zeyde committed
344 345
            if *last_entry != entry {
                *last_entry = entry;
346
                let hex_header = hex::encode(serialize(last_entry.header()));
Roman Zeyde's avatar
Roman Zeyde committed
347
                let header = json!({"hex": hex_header, "height": last_entry.height()});
Roman Zeyde's avatar
Roman Zeyde committed
348 349 350 351 352
                result.push(json!({
                    "jsonrpc": "2.0",
                    "method": "blockchain.headers.subscribe",
                    "params": [header]}));
            }
353 354
        }
        for (script_hash, status_hash) in self.status_hashes.iter_mut() {
355
            let status = self.query.status(&script_hash[..])?;
356
            let new_status_hash = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
357 358 359 360 361 362
            if new_status_hash == *status_hash {
                continue;
            }
            result.push(json!({
                "jsonrpc": "2.0",
                "method": "blockchain.scripthash.subscribe",
Roman Zeyde's avatar
Roman Zeyde committed
363
                "params": [script_hash.to_hex(), new_status_hash]}));
364 365
            *status_hash = new_status_hash;
        }
366 367 368 369
        timer.observe_duration();
        self.stats
            .subscriptions
            .set(self.status_hashes.len() as i64);
370 371 372
        Ok(result)
    }

373 374 375 376 377 378 379 380
    fn send_values(&mut self, values: &[Value]) -> Result<()> {
        for value in values {
            let line = value.to_string() + "\n";
            self.stream
                .write_all(line.as_bytes())
                .chain_err(|| format!("failed to send {}", value))?;
        }
        Ok(())
381 382
    }

383
    fn handle_replies(&mut self) -> Result<()> {
384
        let empty_params = json!([]);
Roman Zeyde's avatar
Roman Zeyde committed
385
        loop {
386
            let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
387
            trace!("RPC {:?}", msg);
Roman Zeyde's avatar
Roman Zeyde committed
388 389 390
            match msg {
                Message::Request(line) => {
                    let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
391 392 393 394 395
                    let reply = match (
                        cmd.get("method"),
                        cmd.get("params").unwrap_or_else(|| &empty_params),
                        cmd.get("id"),
                    ) {
Roman Zeyde's avatar
Roman Zeyde committed
396 397
                        (
                            Some(&Value::String(ref method)),
398
                            &Value::Array(ref params),
399
                            Some(ref id),
Roman Zeyde's avatar
Roman Zeyde committed
400 401 402
                        ) => self.handle_command(method, params, id)?,
                        _ => bail!("invalid command: {}", cmd),
                    };
403
                    self.send_values(&[reply])?
404
                }
405
                Message::PeriodicUpdate => {
Roman Zeyde's avatar
Roman Zeyde committed
406 407
                    let values = self
                        .update_subscriptions()
408 409
                        .chain_err(|| "failed to update subscriptions")?;
                    self.send_values(&values)?
Roman Zeyde's avatar
Roman Zeyde committed
410
                }
411
                Message::Done => return Ok(()),
412
            }
Roman Zeyde's avatar
Roman Zeyde committed
413 414 415
        }
    }

416
    fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
Roman Zeyde's avatar
Roman Zeyde committed
417
        loop {
418
            let mut line = Vec::<u8>::new();
Roman Zeyde's avatar
Roman Zeyde committed
419
            reader
420 421
                .read_until(b'\n', &mut line)
                .chain_err(|| "failed to read a request")?;
Roman Zeyde's avatar
Roman Zeyde committed
422
            if line.is_empty() {
423 424
                tx.send(Message::Done).chain_err(|| "channel closed")?;
                return Ok(());
Roman Zeyde's avatar
Roman Zeyde committed
425
            } else {
426 427 428 429 430
                if line.starts_with(&[22, 3, 1]) {
                    // (very) naive SSL handshake detection
                    let _ = tx.send(Message::Done);
                    bail!("invalid request - maybe SSL-encrypted data?: {:?}", line)
                }
431
                match String::from_utf8(line) {
Roman Zeyde's avatar
Roman Zeyde committed
432 433
                    Ok(req) => tx
                        .send(Message::Request(req))
434 435 436
                        .chain_err(|| "channel closed")?,
                    Err(err) => {
                        let _ = tx.send(Message::Done);
437
                        bail!("invalid UTF8: {}", err)
438 439
                    }
                }
Roman Zeyde's avatar
Roman Zeyde committed
440 441 442 443
            }
        }
    }

444
    pub fn run(mut self) {
445
        let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
446
        let tx = self.chan.sender();
447
        let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
448
        if let Err(e) = self.handle_replies() {
449 450 451 452 453 454
            error!(
                "[{}] connection handling failed: {}",
                self.addr,
                e.display_chain().to_string()
            );
        }
455
        debug!("[{}] shutting down connection", self.addr);
456
        let _ = self.stream.shutdown(Shutdown::Both);
457 458
        if let Err(err) = child.join().expect("receiver panicked") {
            error!("[{}] receiver failed: {}", self.addr, err);
459
        }
460 461 462
    }
}

463
#[derive(Debug)]
464 465
pub enum Message {
    Request(String),
466
    PeriodicUpdate,
467 468 469
    Done,
}

470 471 472 473 474
pub enum Notification {
    Periodic,
    Exit,
}

475
pub struct RPC {
476
    notification: Sender<Notification>,
Roman Zeyde's avatar
Roman Zeyde committed
477
    server: Option<thread::JoinHandle<()>>, // so we can join the server while dropping this ojbect
478 479
}

480 481 482 483 484
struct Stats {
    latency: HistogramVec,
    subscriptions: Gauge,
}

485
impl RPC {
486 487
    fn start_notifier(
        notification: Channel<Notification>,
488
        senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
489
        acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
490
    ) {
491
        spawn_thread("notification", move || {
492
            for msg in notification.receiver().iter() {
493
                let mut senders = senders.lock().unwrap();
494
                match msg {
Roman Zeyde's avatar
Roman Zeyde committed
495 496 497 498 499 500 501 502
                    Notification::Periodic => {
                        for sender in senders.split_off(0) {
                            if let Err(TrySendError::Disconnected(_)) =
                                sender.try_send(Message::PeriodicUpdate)
                            {
                                continue;
                            }
                            senders.push(sender);
503
                        }
Roman Zeyde's avatar
Roman Zeyde committed
504
                    }
505
                    Notification::Exit => acceptor.send(None).unwrap(),
506 507
                }
            }
508
        });
509 510
    }

511
    fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
Roman Zeyde's avatar
Roman Zeyde committed
512
        let chan = Channel::unbounded();
513
        let acceptor = chan.sender();
514
        spawn_thread("acceptor", move || {
Roman Zeyde's avatar
Roman Zeyde committed
515 516
            let listener =
                TcpListener::bind(addr).unwrap_or_else(|e| panic!("bind({}) failed: {}", addr, e));
517
            info!("RPC server running on {}", addr);
518 519
            loop {
                let (stream, addr) = listener.accept().expect("accept failed");
Roman Zeyde's avatar
Roman Zeyde committed
520 521 522
                stream
                    .set_nonblocking(false)
                    .expect("failed to set connection as blocking");
523
                acceptor.send(Some((stream, addr))).expect("send failed");
524 525 526 527 528
            }
        });
        chan
    }

529 530
    pub fn start(addr: SocketAddr, query: Arc<Query>, metrics: &Metrics) -> RPC {
        let stats = Arc::new(Stats {
531
            latency: metrics.histogram_vec(
532
                HistogramOpts::new("electrs_electrum_rpc", "Electrum RPC latency (seconds)"),
533 534 535
                &["method"],
            ),
            subscriptions: metrics.gauge(MetricOpts::new(
536
                "electrs_electrum_subscriptions",
537 538 539
                "# of Electrum subscriptions",
            )),
        });
Roman Zeyde's avatar
Roman Zeyde committed
540
        let notification = Channel::unbounded();
Roman Zeyde's avatar
Roman Zeyde committed
541
        RPC {
542
            notification: notification.sender(),
Roman Zeyde's avatar
Roman Zeyde committed
543
            server: Some(spawn_thread("rpc", move || {
544 545 546
                let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
                let acceptor = RPC::start_acceptor(addr);
                RPC::start_notifier(notification, senders.clone(), acceptor.sender());
547
                let mut children = vec![];
548 549 550 551
                while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
                    let query = query.clone();
                    let senders = senders.clone();
                    let stats = stats.clone();
552
                    children.push(spawn_thread("peer", move || {
553 554 555 556 557
                        info!("[{}] connected peer", addr);
                        let conn = Connection::new(query, stream, addr, stats);
                        senders.lock().unwrap().push(conn.chan.sender());
                        conn.run();
                        info!("[{}] disconnected peer", addr);
558 559
                    }));
                }
560
                trace!("closing {} RPC connections", senders.lock().unwrap().len());
561 562 563
                for sender in senders.lock().unwrap().iter() {
                    let _ = sender.send(Message::Done);
                }
564
                trace!("waiting for {} RPC handling threads", children.len());
565 566
                for child in children {
                    let _ = child.join();
567
                }
Roman Zeyde's avatar
Roman Zeyde committed
568
                trace!("RPC connections are closed");
Roman Zeyde's avatar
Roman Zeyde committed
569
            })),
Roman Zeyde's avatar
Roman Zeyde committed
570
        }
571 572 573
    }

    pub fn notify(&self) {
574 575
        self.notification.send(Notification::Periodic).unwrap();
    }
Roman Zeyde's avatar
Roman Zeyde committed
576
}
577

Roman Zeyde's avatar
Roman Zeyde committed
578 579
impl Drop for RPC {
    fn drop(&mut self) {
Roman Zeyde's avatar
Roman Zeyde committed
580
        trace!("stop accepting new RPCs");
581
        self.notification.send(Notification::Exit).unwrap();
Roman Zeyde's avatar
Roman Zeyde committed
582 583 584
        if let Some(handle) = self.server.take() {
            handle.join().unwrap();
        }
Roman Zeyde's avatar
Roman Zeyde committed
585
        trace!("RPC server is stopped");
586
    }
Roman Zeyde's avatar
Roman Zeyde committed
587
}