Commit f7444940 authored by kenshin-samourai's avatar kenshin-samourai
Browse files

remove support of metrics from index.rs

parent f3711652
......@@ -41,7 +41,7 @@ fn run_server(config: &Config) -> Result<()> {
)?;
// Perform initial indexing from local blk*.dat block files.
let store = DBStore::open(&config.db_path, /*low_memory=*/ config.jsonrpc_import);
let index = Index::load(&store, &daemon, &metrics, config.index_batch_size)?;
let index = Index::load(&store, &daemon, config.index_batch_size)?;
let store = if is_fully_compacted(&store) {
store // initial import and full compaction are over
} else if config.jsonrpc_import {
......
......@@ -12,9 +12,6 @@ use std::sync::RwLock;
use crate::daemon::Daemon;
use crate::errors::*;
use crate::metrics::{
Counter, Gauge, HistogramOpts, HistogramTimer, HistogramVec, MetricOpts, Metrics,
};
use crate::signal::Waiter;
use crate::store::{ReadStore, Row, WriteStore};
use crate::util::{
......@@ -274,63 +271,10 @@ fn read_indexed_headers(store: &dyn ReadStore) -> HeaderList {
result
}
struct Stats {
blocks: Counter,
txns: Counter,
vsize: Counter,
height: Gauge,
duration: HistogramVec,
}
impl Stats {
fn new(metrics: &Metrics) -> Stats {
Stats {
blocks: metrics.counter(MetricOpts::new(
"electrs_index_blocks",
"# of indexed blocks",
)),
txns: metrics.counter(MetricOpts::new(
"electrs_index_txns",
"# of indexed transactions",
)),
vsize: metrics.counter(MetricOpts::new(
"electrs_index_vsize",
"# of indexed vbytes",
)),
height: metrics.gauge(MetricOpts::new(
"electrs_index_height",
"Last indexed block's height",
)),
duration: metrics.histogram_vec(
HistogramOpts::new("electrs_index_duration", "indexing duration (in seconds)"),
&["step"],
),
}
}
fn update(&self, block: &Block, height: usize) {
self.blocks.inc();
self.txns.inc_by(block.txdata.len() as i64);
for tx in &block.txdata {
self.vsize.inc_by(tx.get_weight() as i64 / 4);
}
self.update_height(height);
}
fn update_height(&self, height: usize) {
self.height.set(height as i64);
}
fn start_timer(&self, step: &str) -> HistogramTimer {
self.duration.with_label_values(&[step]).start_timer()
}
}
pub struct Index {
// TODO: store also latest snapshot.
headers: RwLock<HeaderList>,
daemon: Daemon,
stats: Stats,
batch_size: usize,
}
......@@ -338,16 +282,12 @@ impl Index {
pub fn load(
store: &dyn ReadStore,
daemon: &Daemon,
metrics: &Metrics,
batch_size: usize,
) -> Result<Index> {
let stats = Stats::new(metrics);
let headers = read_indexed_headers(store);
stats.height.set((headers.len() as i64) - 1);
Ok(Index {
headers: RwLock::new(headers),
daemon: daemon.reconnect()?,
stats,
batch_size,
})
}
......@@ -373,13 +313,16 @@ impl Index {
pub fn update(&self, store: &impl WriteStore, waiter: &Waiter) -> Result<Sha256dHash> {
let daemon = self.daemon.reconnect()?;
let tip = daemon.getbestblockhash()?;
let new_headers: Vec<HeaderEntry> = {
let indexed_headers = self.headers.read().unwrap();
indexed_headers.order(daemon.get_new_headers(&indexed_headers, &tip)?)
};
if let Some(latest_header) = new_headers.last() {
info!("{:?} ({} left to index)", latest_header, new_headers.len());
};
let height_map = HashMap::<Sha256dHash, usize>::from_iter(
new_headers.iter().map(|h| (*h.hash(), h.height())),
);
......@@ -388,6 +331,7 @@ impl Index {
let sender = chan.sender();
let blockhashes: Vec<Sha256dHash> = new_headers.iter().map(|h| *h.hash()).collect();
let batch_size = self.batch_size;
let fetcher = spawn_thread("fetcher", move || {
for chunk in blockhashes.chunks(batch_size) {
sender
......@@ -398,14 +342,15 @@ impl Index {
.send(Ok(vec![]))
.expect("failed sending explicit end of stream");
});
loop {
waiter.poll()?;
let timer = self.stats.start_timer("fetch");
let batch = chan
.receiver()
.recv()
.expect("block fetch exited prematurely")?;
timer.observe_duration();
if batch.is_empty() {
break;
}
......@@ -415,24 +360,16 @@ impl Index {
let height = *height_map
.get(&blockhash)
.unwrap_or_else(|| panic!("missing header for block {}", blockhash));
self.stats.update(block, height); // TODO: update stats after the block is indexed
index_block(block, height).chain(std::iter::once(last_indexed_block(&blockhash)))
});
let timer = self.stats.start_timer("index+write");
store.write(rows_iter);
timer.observe_duration();
}
let timer = self.stats.start_timer("flush");
store.flush(); // make sure no row is left behind
timer.observe_duration();
store.flush(); // make sure no row is left behind
fetcher.join().expect("block fetcher failed");
self.headers.write().unwrap().apply(new_headers, tip);
assert_eq!(tip, self.headers.read().unwrap().tip());
self.stats
.update_height(self.headers.read().unwrap().len() - 1);
Ok(tip)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment