Unverified Commit 4d783a38 authored by Roman Zeyde's avatar Roman Zeyde
Browse files

Replace RPC timing with Prometheus monitoring

parent 4b0773e5
......@@ -38,7 +38,7 @@ fn run_server(config: &Config) -> Result<()> {
let app = App::new(store, index, daemon);
let query = Query::new(app.clone(), &metrics);
let rpc = RPC::start(config.rpc_addr, query.clone());
let rpc = RPC::start(config.rpc_addr, query.clone(), &metrics);
while let None = signal.wait(Duration::from_secs(5)) {
query.update_mempool()?;
if tip != app.daemon().getbestblockhash()? {
......
......@@ -11,8 +11,9 @@ use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TrySe
use std::sync::{Arc, Mutex};
use std::thread;
use metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use query::Query;
use util::{HeaderEntry, Timer};
use util::HeaderEntry;
use errors::*;
......@@ -50,17 +51,24 @@ struct Connection {
stream: TcpStream,
addr: SocketAddr,
chan: Channel,
stats: Arc<Stats>,
}
impl Connection {
pub fn new(query: Arc<Query>, stream: TcpStream, addr: SocketAddr) -> Connection {
pub fn new(
query: Arc<Query>,
stream: TcpStream,
addr: SocketAddr,
stats: Arc<Stats>,
) -> Connection {
Connection {
query: query,
query,
last_header_entry: None, // disable header subscription for now
status_hashes: HashMap::new(),
stream,
addr,
chan: Channel::new(),
stats,
}
}
......@@ -189,6 +197,10 @@ impl Connection {
}
fn handle_command(&mut self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
let timer = self.stats
.latency
.with_label_values(&[method])
.start_timer();
let result = match method {
"blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
"server.version" => self.server_version(),
......@@ -208,6 +220,7 @@ impl Connection {
"blockchain.transaction.get_merkle" => self.blockchain_transaction_get_merkle(&params),
&_ => bail!("unknown method {} {:?}", method, params),
};
timer.observe_duration();
// TODO: return application errors should be sent to the client
Ok(match result {
Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
......@@ -225,8 +238,11 @@ impl Connection {
}
fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
let timer = self.stats
.latency
.with_label_values(&["periodic_update"])
.start_timer();
let mut result = vec![];
let mut timer = Timer::new();
if let Some(ref mut last_entry) = self.last_header_entry {
let entry = self.query.get_best_header()?;
if *last_entry != entry {
......@@ -251,18 +267,15 @@ impl Connection {
"params": [script_hash.be_hex_string(), new_status_hash]}));
*status_hash = new_status_hash;
}
timer.tick("total");
debug!(
"updating {} subscriptions {:?}",
self.status_hashes.len(),
timer
);
timer.observe_duration();
self.stats
.subscriptions
.set(self.status_hashes.len() as i64);
Ok(result)
}
fn send_values(&mut self, values: &[Value]) -> Result<()> {
for value in values {
debug!("[{}] <- {}", self.addr, value);
let line = value.to_string() + "\n";
self.stream
.write_all(line.as_bytes())
......@@ -277,7 +290,6 @@ impl Connection {
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
debug!("[{}] -> {}", self.addr, cmd);
let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
(
Some(&Value::String(ref method)),
......@@ -366,6 +378,11 @@ pub struct RPC {
notification: Sender<()>,
}
struct Stats {
latency: HistogramVec,
subscriptions: Gauge,
}
impl RPC {
fn start_notification_worker(
receiver: Receiver<()>,
......@@ -386,7 +403,17 @@ impl RPC {
});
}
pub fn start(addr: SocketAddr, query: Arc<Query>) -> RPC {
pub fn start(addr: SocketAddr, query: Arc<Query>, metrics: &Metrics) -> RPC {
let stats = Arc::new(Stats {
latency: metrics.histogram(
HistogramOpts::new("electrum_rpc", "Electrum RPC latency (seconds)"),
&["method"],
),
subscriptions: metrics.gauge(MetricOpts::new(
"electrum_subscriptions",
"# of Electrum subscriptions",
)),
});
let (tx, rx) = channel();
let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
info!("RPC server running on {}", addr);
......@@ -397,9 +424,10 @@ impl RPC {
let (stream, addr) = listener.accept().expect("accept failed");
let query = query.clone();
let senders = senders.clone();
let stats = stats.clone();
thread::spawn(move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr);
let conn = Connection::new(query, stream, addr, stats);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment