Commit 13bf3d15 authored by kenshin-samourai's avatar kenshin-samourai
Browse files

remove support of rpc notifications

parent 058ba1f4
......@@ -61,9 +61,7 @@ fn run_server(config: &Config) -> Result<()> {
loop {
app.update(&signal)?;
query.update_mempool()?;
server
.get_or_insert_with(|| RPC::start(config.electrum_rpc_addr, query.clone()))
.notify(); // update subscribed clients
server.get_or_insert_with(|| RPC::start(config.electrum_rpc_addr, query.clone()));
if let Err(err) = signal.wait(Duration::from_secs(5)) {
info!("stopping server: {}", err);
break;
......
use bitcoin::consensus::encode::serialize;
use bitcoin_hashes::hex::{FromHex, ToHex};
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use error_chain::ChainedError;
use hex;
use serde_json::{from_str, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Mutex};
use std::thread;
use crate::errors::*;
use crate::query::Query;
use crate::util::{spawn_thread, Channel, HeaderEntry, SyncChannel};
use crate::util::{spawn_thread, Channel, SyncChannel};
const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION");
const PROTOCOL_VERSION: &str = "1.4";
......@@ -29,8 +26,6 @@ fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
struct Connection {
query: Arc<Query>,
last_header_entry: Option<HeaderEntry>,
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
chan: SyncChannel<Message>,
......@@ -44,8 +39,6 @@ impl Connection {
) -> Connection {
Connection {
query,
last_header_entry: None, // disable header subscription for now
status_hashes: HashMap::new(),
stream,
addr,
chan: SyncChannel::new(10),
......@@ -94,35 +87,6 @@ impl Connection {
})
}
fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
let mut result = vec![];
if let Some(ref mut last_entry) = self.last_header_entry {
let entry = self.query.get_best_header()?;
if *last_entry != entry {
*last_entry = entry;
let hex_header = hex::encode(serialize(last_entry.header()));
let header = json!({"hex": hex_header, "height": last_entry.height()});
result.push(json!({
"jsonrpc": "2.0",
"method": "blockchain.headers.subscribe",
"params": [header]}));
}
}
for (script_hash, status_hash) in self.status_hashes.iter_mut() {
let status = self.query.status(&script_hash[..])?;
let new_status_hash = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
if new_status_hash == *status_hash {
continue;
}
result.push(json!({
"jsonrpc": "2.0",
"method": "blockchain.scripthash.subscribe",
"params": [script_hash.to_hex(), new_status_hash]}));
*status_hash = new_status_hash;
}
Ok(result)
}
fn send_values(&mut self, values: &[Value]) -> Result<()> {
for value in values {
let line = value.to_string() + "\n";
......@@ -155,12 +119,6 @@ impl Connection {
};
self.send_values(&[reply])?
}
Message::PeriodicUpdate => {
let values = self
.update_subscriptions()
.chain_err(|| "failed to update subscriptions")?;
self.send_values(&values)?
}
Message::Done => return Ok(()),
}
}
......@@ -216,46 +174,14 @@ impl Connection {
#[derive(Debug)]
pub enum Message {
Request(String),
PeriodicUpdate,
Done,
}
pub enum Notification {
Periodic,
Exit,
}
pub struct RPC {
notification: Sender<Notification>,
server: Option<thread::JoinHandle<()>>, // so we can join the server while dropping this ojbect
}
impl RPC {
fn start_notifier(
notification: Channel<Notification>,
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
) {
spawn_thread("notification", move || {
for msg in notification.receiver().iter() {
let mut senders = senders.lock().unwrap();
match msg {
Notification::Periodic => {
for sender in senders.split_off(0) {
if let Err(TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
}
senders.push(sender);
}
}
Notification::Exit => acceptor.send(None).unwrap(),
}
}
});
}
fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
let chan = Channel::unbounded();
let acceptor = chan.sender();
......@@ -278,14 +204,10 @@ impl RPC {
}
pub fn start(addr: SocketAddr, query: Arc<Query>) -> RPC {
let notification = Channel::unbounded();
RPC {
notification: notification.sender(),
server: Some(spawn_thread("rpc", move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
let acceptor = RPC::start_acceptor(addr);
RPC::start_notifier(notification, senders.clone(), acceptor.sender());
let mut children = vec![];
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
let query = query.clone();
......@@ -310,16 +232,11 @@ impl RPC {
})),
}
}
pub fn notify(&self) {
self.notification.send(Notification::Periodic).unwrap();
}
}
impl Drop for RPC {
fn drop(&mut self) {
trace!("stop accepting new RPCs");
self.notification.send(Notification::Exit).unwrap();
if let Some(handle) = self.server.take() {
handle.join().unwrap();
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment