rpc.rs 19.5 KB
Newer Older
1 2
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::network::serialize::{deserialize, serialize};
3
use bitcoin::util::address::Address;
4
use bitcoin::util::hash::Sha256dHash;
5
use error_chain::ChainedError;
6
use hex;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
7
use serde_json::{from_str, Number, Value};
Roman Zeyde's avatar
Roman Zeyde committed
8
use std::collections::HashMap;
9
use std::io::{BufRead, BufReader, Write};
10
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
11
use std::str::FromStr;
12
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
13
use std::sync::{Arc, Mutex};
14
use std::thread;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
15

16
use index::compute_script_hash;
17
use metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
18
use query::{Query, Status};
19
use util::{spawn_thread, Channel, HeaderEntry, SyncChannel};
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
20

Roman Zeyde's avatar
Roman Zeyde committed
21
use errors::*;
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
22

Roman Zeyde's avatar
Roman Zeyde committed
23
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
Roman Zeyde's avatar
Roman Zeyde committed
24 25
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
    let script_hash = val.chain_err(|| "missing hash")?;
Roman Zeyde's avatar
Roman Zeyde committed
26 27 28 29 30
    let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
    let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex hash")?;
    Ok(script_hash)
}

31 32 33 34 35 36
fn usize_from_value(val: Option<&Value>, name: &str) -> Result<usize> {
    let val = val.chain_err(|| format!("missing {}", name))?;
    let val = val.as_u64().chain_err(|| format!("non-integer {}", name))?;
    Ok(val as usize)
}

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
fn unspent_from_status(status: &Status) -> Value {
    json!(Value::Array(
        status
            .unspent()
            .into_iter()
            .map(|out| {
                json!({
                "height": out.height,
                "tx_pos": out.output_index,
                "tx_hash": out.txn_id.be_hex_string(),
                "value": out.value,
            })
            })
            .collect()
    ))
}

54 55
fn jsonify_header(entry: &HeaderEntry) -> Value {
    let header = entry.header();
Roman Zeyde's avatar
Roman Zeyde committed
56
    json!({
57
        "block_height": entry.height(),
Roman Zeyde's avatar
Roman Zeyde committed
58 59 60 61 62 63 64 65 66
        "version": header.version,
        "prev_block_hash": header.prev_blockhash.be_hex_string(),
        "merkle_root": header.merkle_root.be_hex_string(),
        "timestamp": header.time,
        "bits": header.bits,
        "nonce": header.nonce
    })
}

67 68
struct Connection {
    query: Arc<Query>,
Roman Zeyde's avatar
Roman Zeyde committed
69
    last_header_entry: Option<HeaderEntry>,
70
    status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
71 72
    stream: TcpStream,
    addr: SocketAddr,
73
    chan: SyncChannel<Message>,
74
    stats: Arc<Stats>,
75 76
}

77
impl Connection {
78 79 80 81 82 83
    pub fn new(
        query: Arc<Query>,
        stream: TcpStream,
        addr: SocketAddr,
        stats: Arc<Stats>,
    ) -> Connection {
84
        Connection {
85
            query,
Roman Zeyde's avatar
Roman Zeyde committed
86
            last_header_entry: None, // disable header subscription for now
87
            status_hashes: HashMap::new(),
88 89
            stream,
            addr,
90
            chan: SyncChannel::new(10),
91
            stats,
92 93 94 95
        }
    }

    fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
96
        let entry = self.query.get_best_header()?;
Roman Zeyde's avatar
Roman Zeyde committed
97 98 99 100
        let hex_header = hex::encode(serialize(entry.header()).unwrap());
        let result = json!({"hex": hex_header, "height": entry.height()});
        self.last_header_entry = Some(entry);
        Ok(result)
101
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
102

103
    fn server_version(&self) -> Result<Value> {
104
        Ok(json!(["RustElectrum 0.1.0", "1.2"]))
105
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
106

107
    fn server_banner(&self) -> Result<Value> {
108
        Ok(json!("Welcome to RustElectrum Server!\n"))
109
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
110

111
    fn server_donation_address(&self) -> Result<Value> {
112
        Ok(Value::Null)
113
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
114

115 116 117
    fn server_peers_subscribe(&self) -> Result<Value> {
        Ok(json!([]))
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
118

119
    fn mempool_get_fee_histogram(&self) -> Result<Value> {
120
        Ok(json!(self.query.get_fee_histogram()))
121
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
122

123 124 125 126
    fn blockchain_block_headers(&self, params: &[Value]) -> Result<Value> {
        let start_height = usize_from_value(params.get(0), "start_height")?;
        let count = usize_from_value(params.get(1), "count")?;
        let heights: Vec<usize> = (start_height..(start_height + count)).collect();
Roman Zeyde's avatar
Roman Zeyde committed
127 128 129
        let headers: Vec<String> = self.query
            .get_headers(&heights)
            .into_iter()
130
            .map(|entry| hex::encode(&serialize(entry.header()).unwrap()))
Roman Zeyde's avatar
Roman Zeyde committed
131
            .collect();
132 133 134 135 136
        Ok(json!({
            "count": headers.len(),
            "hex": headers.join(""),
            "max": 2016,
        }))
137 138
    }

Roman Zeyde's avatar
Roman Zeyde committed
139
    fn blockchain_block_get_header(&self, params: &[Value]) -> Result<Value> {
140
        let height = usize_from_value(params.get(0), "missing height")?;
141 142 143 144 145 146
        let mut entries = self.query.get_headers(&[height]);
        let entry = entries
            .pop()
            .chain_err(|| format!("missing header #{}", height))?;
        assert_eq!(entries.len(), 0);
        Ok(json!(jsonify_header(&entry)))
Roman Zeyde's avatar
Roman Zeyde committed
147 148
    }

149
    fn blockchain_estimatefee(&self, params: &[Value]) -> Result<Value> {
150 151
        let blocks_count = usize_from_value(params.get(0), "blocks_count")?;
        let fee_rate = self.query.estimate_fee(blocks_count); // in BTC/kB
152
        Ok(json!(fee_rate))
153
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
154

Roman Zeyde's avatar
Roman Zeyde committed
155
    fn blockchain_relayfee(&self) -> Result<Value> {
156
        Ok(json!(0.0)) // allow sending transactions with any fee.
Roman Zeyde's avatar
Roman Zeyde committed
157 158
    }

159
    fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
160
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
161
        let status = self.query.status(&script_hash[..])?;
162
        let result = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
163 164
        self.status_hashes.insert(script_hash, result.clone());
        Ok(result)
165
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
166

167
    fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
168
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
169
        let status = self.query.status(&script_hash[..])?;
170 171 172
        Ok(
            json!({ "confirmed": status.confirmed_balance(), "unconfirmed": status.mempool_balance() }),
        )
173
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
174

Roman Zeyde's avatar
Roman Zeyde committed
175
    fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
176
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
177
        let status = self.query.status(&script_hash[..])?;
Roman Zeyde's avatar
Roman Zeyde committed
178
        Ok(json!(Value::Array(
179 180
            status
                .history()
Roman Zeyde's avatar
Roman Zeyde committed
181 182 183 184
                .into_iter()
                .map(|item| json!({"height": item.0, "tx_hash": item.1.be_hex_string()}))
                .collect()
        )))
185
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
186

187 188
    fn blockchain_scripthash_listunspent(&self, params: &[Value]) -> Result<Value> {
        let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
189 190 191 192 193 194 195 196 197 198 199 200
        Ok(unspent_from_status(&self.query.status(&script_hash[..])?))
    }

    fn blockchain_address_listunspent(&self, params: &[Value]) -> Result<Value> {
        let address = params
            .get(0)
            .chain_err(|| "no address")?
            .as_str()
            .chain_err(|| "non-string address")?;
        let addr = Address::from_str(address).chain_err(|| format!("invalid address {}", address))?;
        let script_hash = compute_script_hash(&addr.script_pubkey().into_vec());
        Ok(unspent_from_status(&self.query.status(&script_hash[..])?))
201 202
    }

203 204 205 206 207
    fn blockchain_transaction_broadcast(&self, params: &[Value]) -> Result<Value> {
        let tx = params.get(0).chain_err(|| "missing tx")?;
        let tx = tx.as_str().chain_err(|| "non-string tx")?;
        let tx = hex::decode(&tx).chain_err(|| "non-hex tx")?;
        let tx: Transaction = deserialize(&tx).chain_err(|| "failed to parse tx")?;
208
        let txid = self.query.broadcast(&tx)?;
209 210 211
        Ok(json!(txid.be_hex_string()))
    }

212
    fn blockchain_transaction_get(&self, params: &[Value]) -> Result<Value> {
Roman Zeyde's avatar
Roman Zeyde committed
213
        // TODO: handle 'verbose' param
Roman Zeyde's avatar
Roman Zeyde committed
214
        let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
215
        let tx = self.query.load_txn(&tx_hash, /*blockhash=*/ None)?;
216
        Ok(json!(hex::encode(&serialize(&tx).unwrap())))
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
217 218
    }

Roman Zeyde's avatar
Roman Zeyde committed
219 220
    fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result<Value> {
        let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
221
        let height = usize_from_value(params.get(1), "height")?;
Roman Zeyde's avatar
Roman Zeyde committed
222 223 224 225 226 227 228 229 230 231 232
        let (merkle, pos) = self.query
            .get_merkle_proof(&tx_hash, height)
            .chain_err(|| "cannot create merkle proof")?;
        let merkle: Vec<String> = merkle
            .into_iter()
            .map(|txid| txid.be_hex_string())
            .collect();
        Ok(json!({
                "block_height": height,
                "merkle": merkle,
                "pos": pos}))
233
    }
Roman Zeyde's avatar
WiP  
Roman Zeyde committed
234

235
    fn handle_command(&mut self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
236 237 238 239
        let timer = self.stats
            .latency
            .with_label_values(&[method])
            .start_timer();
240 241 242 243 244 245 246
        let result = match method {
            "blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
            "server.version" => self.server_version(),
            "server.banner" => self.server_banner(),
            "server.donation_address" => self.server_donation_address(),
            "server.peers.subscribe" => self.server_peers_subscribe(),
            "mempool.get_fee_histogram" => self.mempool_get_fee_histogram(),
247
            "blockchain.block.headers" => self.blockchain_block_headers(&params),
Roman Zeyde's avatar
Roman Zeyde committed
248
            "blockchain.block.get_header" => self.blockchain_block_get_header(&params),
249
            "blockchain.estimatefee" => self.blockchain_estimatefee(&params),
Roman Zeyde's avatar
Roman Zeyde committed
250
            "blockchain.relayfee" => self.blockchain_relayfee(),
251
            "blockchain.address.listunspent" => self.blockchain_address_listunspent(&params),
252 253 254
            "blockchain.scripthash.subscribe" => self.blockchain_scripthash_subscribe(&params),
            "blockchain.scripthash.get_balance" => self.blockchain_scripthash_get_balance(&params),
            "blockchain.scripthash.get_history" => self.blockchain_scripthash_get_history(&params),
255
            "blockchain.scripthash.listunspent" => self.blockchain_scripthash_listunspent(&params),
256
            "blockchain.transaction.broadcast" => self.blockchain_transaction_broadcast(&params),
257 258 259
            "blockchain.transaction.get" => self.blockchain_transaction_get(&params),
            "blockchain.transaction.get_merkle" => self.blockchain_transaction_get_merkle(&params),
            &_ => bail!("unknown method {} {:?}", method, params),
260
        };
261
        timer.observe_duration();
262
        // TODO: return application errors should be sent to the client
263 264 265
        Ok(match result {
            Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
            Err(e) => {
Roman Zeyde's avatar
Roman Zeyde committed
266 267 268 269 270 271 272
                warn!(
                    "rpc #{} {} {:?} failed: {}",
                    id,
                    method,
                    params,
                    e.display_chain()
                );
273 274 275
                json!({"jsonrpc": "2.0", "id": id, "error": format!("{}", e)})
            }
        })
276 277
    }

Roman Zeyde's avatar
Roman Zeyde committed
278
    fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
279 280 281 282
        let timer = self.stats
            .latency
            .with_label_values(&["periodic_update"])
            .start_timer();
283
        let mut result = vec![];
Roman Zeyde's avatar
Roman Zeyde committed
284
        if let Some(ref mut last_entry) = self.last_header_entry {
285
            let entry = self.query.get_best_header()?;
Roman Zeyde's avatar
Roman Zeyde committed
286 287
            if *last_entry != entry {
                *last_entry = entry;
Roman Zeyde's avatar
Roman Zeyde committed
288 289
                let hex_header = hex::encode(serialize(last_entry.header()).unwrap());
                let header = json!({"hex": hex_header, "height": last_entry.height()});
Roman Zeyde's avatar
Roman Zeyde committed
290 291 292 293 294
                result.push(json!({
                    "jsonrpc": "2.0",
                    "method": "blockchain.headers.subscribe",
                    "params": [header]}));
            }
295 296
        }
        for (script_hash, status_hash) in self.status_hashes.iter_mut() {
297
            let status = self.query.status(&script_hash[..])?;
298
            let new_status_hash = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
299 300 301 302 303 304 305 306 307
            if new_status_hash == *status_hash {
                continue;
            }
            result.push(json!({
                "jsonrpc": "2.0",
                "method": "blockchain.scripthash.subscribe",
                "params": [script_hash.be_hex_string(), new_status_hash]}));
            *status_hash = new_status_hash;
        }
308 309 310 311
        timer.observe_duration();
        self.stats
            .subscriptions
            .set(self.status_hashes.len() as i64);
312 313 314
        Ok(result)
    }

315 316 317 318 319 320 321 322
    fn send_values(&mut self, values: &[Value]) -> Result<()> {
        for value in values {
            let line = value.to_string() + "\n";
            self.stream
                .write_all(line.as_bytes())
                .chain_err(|| format!("failed to send {}", value))?;
        }
        Ok(())
323 324
    }

325
    fn handle_replies(&mut self) -> Result<()> {
Roman Zeyde's avatar
Roman Zeyde committed
326
        loop {
327
            let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
328
            trace!("RPC {:?}", msg);
Roman Zeyde's avatar
Roman Zeyde committed
329 330 331 332 333 334 335 336 337 338 339
            match msg {
                Message::Request(line) => {
                    let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
                    let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
                        (
                            Some(&Value::String(ref method)),
                            Some(&Value::Array(ref params)),
                            Some(&Value::Number(ref id)),
                        ) => self.handle_command(method, params, id)?,
                        _ => bail!("invalid command: {}", cmd),
                    };
340
                    self.send_values(&[reply])?
341
                }
342
                Message::PeriodicUpdate => {
343 344 345
                    let values = self.update_subscriptions()
                        .chain_err(|| "failed to update subscriptions")?;
                    self.send_values(&values)?
Roman Zeyde's avatar
Roman Zeyde committed
346
                }
347
                Message::Done => return Ok(()),
348
            }
Roman Zeyde's avatar
Roman Zeyde committed
349 350 351
        }
    }

352
    fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
Roman Zeyde's avatar
Roman Zeyde committed
353
        loop {
354
            let mut line = Vec::<u8>::new();
Roman Zeyde's avatar
Roman Zeyde committed
355
            reader
356 357
                .read_until(b'\n', &mut line)
                .chain_err(|| "failed to read a request")?;
Roman Zeyde's avatar
Roman Zeyde committed
358
            if line.is_empty() {
359 360
                tx.send(Message::Done).chain_err(|| "channel closed")?;
                return Ok(());
Roman Zeyde's avatar
Roman Zeyde committed
361
            } else {
362 363 364 365 366 367 368 369 370 371 372 373
                match String::from_utf8(line) {
                    Ok(req) => tx.send(Message::Request(req))
                        .chain_err(|| "channel closed")?,
                    Err(err) => {
                        let _ = tx.send(Message::Done);
                        bail!(
                            "invalid UTF8 {:?}: {:?}",
                            String::from_utf8_lossy(err.as_bytes()),
                            err
                        )
                    }
                }
Roman Zeyde's avatar
Roman Zeyde committed
374 375 376 377
            }
        }
    }

378
    pub fn run(mut self) {
379
        let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
380
        let tx = self.chan.sender();
381
        let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
382
        if let Err(e) = self.handle_replies() {
383 384 385 386 387 388
            error!(
                "[{}] connection handling failed: {}",
                self.addr,
                e.display_chain().to_string()
            );
        }
389
        info!("[{}] shutting down connection", self.addr);
390
        let _ = self.stream.shutdown(Shutdown::Both);
391 392
        if let Err(err) = child.join().expect("receiver panicked") {
            error!("[{}] receiver failed: {}", self.addr, err);
393
        }
394 395 396
    }
}

397
#[derive(Debug)]
398 399
pub enum Message {
    Request(String),
400
    PeriodicUpdate,
401 402 403
    Done,
}

404 405 406 407 408
pub enum Notification {
    Periodic,
    Exit,
}

409
pub struct RPC {
410
    notification: Sender<Notification>,
Roman Zeyde's avatar
Roman Zeyde committed
411
    server: Option<thread::JoinHandle<()>>, // so we can join the server while dropping this ojbect
412 413
}

414 415 416 417 418
struct Stats {
    latency: HistogramVec,
    subscriptions: Gauge,
}

419
impl RPC {
420 421
    fn start_notifier(
        notification: Channel<Notification>,
422
        senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
423
        acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
424
    ) {
425
        spawn_thread("notification", move || {
426
            for msg in notification.receiver().iter() {
427
                let mut senders = senders.lock().unwrap();
428 429 430 431 432 433 434 435 436 437
                match msg {
                    Notification::Periodic => for sender in senders.split_off(0) {
                        if let Err(TrySendError::Disconnected(_)) =
                            sender.try_send(Message::PeriodicUpdate)
                        {
                            continue;
                        }
                        senders.push(sender);
                    },
                    Notification::Exit => acceptor.send(None).unwrap(),
438 439
                }
            }
440
        });
441 442
    }

443
    fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
444
        let chan = Channel::new();
445
        let acceptor = chan.sender();
446
        spawn_thread("acceptor", move || {
447
            let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
448
            info!("RPC server running on {}", addr);
449 450
            loop {
                let (stream, addr) = listener.accept().expect("accept failed");
451
                acceptor.send(Some((stream, addr))).expect("send failed");
452 453 454 455 456
            }
        });
        chan
    }

457 458
    pub fn start(addr: SocketAddr, query: Arc<Query>, metrics: &Metrics) -> RPC {
        let stats = Arc::new(Stats {
459
            latency: metrics.histogram_vec(
460 461 462 463 464 465 466 467
                HistogramOpts::new("electrum_rpc", "Electrum RPC latency (seconds)"),
                &["method"],
            ),
            subscriptions: metrics.gauge(MetricOpts::new(
                "electrum_subscriptions",
                "# of Electrum subscriptions",
            )),
        });
468 469 470
        let notification = Channel::new();
        let handle = RPC {
            notification: notification.sender(),
Roman Zeyde's avatar
Roman Zeyde committed
471
            server: Some(spawn_thread("rpc", move || {
472 473 474
                let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
                let acceptor = RPC::start_acceptor(addr);
                RPC::start_notifier(notification, senders.clone(), acceptor.sender());
475
                let mut children = vec![];
476 477 478 479
                while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
                    let query = query.clone();
                    let senders = senders.clone();
                    let stats = stats.clone();
480
                    children.push(spawn_thread("peer", move || {
481 482 483 484 485
                        info!("[{}] connected peer", addr);
                        let conn = Connection::new(query, stream, addr, stats);
                        senders.lock().unwrap().push(conn.chan.sender());
                        conn.run();
                        info!("[{}] disconnected peer", addr);
486 487
                    }));
                }
Roman Zeyde's avatar
Roman Zeyde committed
488
                trace!("closing RPC connections");
489 490 491 492 493
                for sender in senders.lock().unwrap().iter() {
                    let _ = sender.send(Message::Done);
                }
                for child in children {
                    let _ = child.join();
494
                }
Roman Zeyde's avatar
Roman Zeyde committed
495
                trace!("RPC connections are closed");
Roman Zeyde's avatar
Roman Zeyde committed
496
            })),
497 498
        };
        handle
499 500 501
    }

    pub fn notify(&self) {
502 503
        self.notification.send(Notification::Periodic).unwrap();
    }
Roman Zeyde's avatar
Roman Zeyde committed
504
}
505

Roman Zeyde's avatar
Roman Zeyde committed
506 507
impl Drop for RPC {
    fn drop(&mut self) {
Roman Zeyde's avatar
Roman Zeyde committed
508
        trace!("stop accepting new RPCs");
509
        self.notification.send(Notification::Exit).unwrap();
Roman Zeyde's avatar
Roman Zeyde committed
510
        self.server.take().map(|t| t.join().unwrap());
Roman Zeyde's avatar
Roman Zeyde committed
511
        trace!("RPC server is stopped");
512
    }
Roman Zeyde's avatar
Roman Zeyde committed
513
}