Commit 1673b154 authored by kenshin-samourai's avatar kenshin-samourai
Browse files

remove support of metrics from bulk.rs

parent f160d992
......@@ -17,7 +17,6 @@ use electrs::{
daemon::Daemon,
errors::*,
index::Index,
metrics::Metrics,
query::Query,
rpc::RPC,
signal::Waiter,
......@@ -26,8 +25,6 @@ use electrs::{
fn run_server(config: &Config) -> Result<()> {
let signal = Waiter::start();
let metrics = Metrics::new(config.monitoring_addr);
metrics.start();
let blocktxids_cache = Arc::new(BlockTxIDsCache::new(config.blocktxids_cache_size));
let daemon = Daemon::new(
......@@ -49,7 +46,7 @@ fn run_server(config: &Config) -> Result<()> {
} else {
// faster, but uses more memory
let store =
bulk::index_blk_files(&daemon, config.bulk_index_threads, &metrics, &signal, store)?;
bulk::index_blk_files(&daemon, config.bulk_index_threads, &signal, store)?;
let store = full_compaction(store);
index.reload(&store); // make sure the block header index is up-to-date
store
......
......@@ -16,47 +16,26 @@ use std::thread;
use crate::daemon::Daemon;
use crate::errors::*;
use crate::index::{index_block, last_indexed_block, read_indexed_blockhashes};
use crate::metrics::{CounterVec, Histogram, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use crate::signal::Waiter;
use crate::store::{DBStore, Row, WriteStore};
use crate::util::{spawn_thread, HeaderList, SyncChannel};
struct Parser {
magic: u32,
current_headers: HeaderList,
indexed_blockhashes: Mutex<HashSet<Sha256dHash>>,
// metrics
duration: HistogramVec,
block_count: CounterVec,
bytes_read: Histogram,
}
impl Parser {
fn new(
daemon: &Daemon,
metrics: &Metrics,
indexed_blockhashes: HashSet<Sha256dHash>,
) -> Result<Arc<Parser>> {
Ok(Arc::new(Parser {
magic: daemon.magic(),
current_headers: load_headers(daemon)?,
indexed_blockhashes: Mutex::new(indexed_blockhashes),
duration: metrics.histogram_vec(
HistogramOpts::new(
"electrs_parse_duration",
"blk*.dat parsing duration (in seconds)",
),
&["step"],
),
block_count: metrics.counter_vec(
MetricOpts::new("electrs_parse_blocks", "# of block parsed (from blk*.dat)"),
&["type"],
),
bytes_read: metrics.histogram(HistogramOpts::new(
"electrs_parse_bytes_read",
"# of bytes read (from blk*.dat)",
)),
}))
}
......@@ -74,44 +53,28 @@ impl Parser {
}
fn read_blkfile(&self, path: &Path) -> Result<Vec<u8>> {
let timer = self.duration.with_label_values(&["read"]).start_timer();
let blob = fs::read(&path).chain_err(|| format!("failed to read {:?}", path))?;
timer.observe_duration();
self.bytes_read.observe(blob.len() as f64);
Ok(blob)
}
fn index_blkfile(&self, blob: Vec<u8>) -> Result<Vec<Row>> {
let timer = self.duration.with_label_values(&["parse"]).start_timer();
let blocks = parse_blocks(blob, self.magic)?;
timer.observe_duration();
let mut rows = Vec::<Row>::new();
let timer = self.duration.with_label_values(&["index"]).start_timer();
for block in blocks {
let blockhash = block.bitcoin_hash();
if let Some(header) = self.current_headers.header_by_blockhash(&blockhash) {
if self
.indexed_blockhashes
if self.indexed_blockhashes
.lock()
.expect("indexed_blockhashes")
.insert(blockhash)
{
rows.extend(index_block(&block, header.height()));
self.block_count.with_label_values(&["indexed"]).inc();
} else {
self.block_count.with_label_values(&["duplicate"]).inc();
}
} else {
// will be indexed later (after bulk load is over) if not an orphan block
self.block_count.with_label_values(&["skipped"]).inc();
}
}
timer.observe_duration();
let timer = self.duration.with_label_values(&["sort"]).start_timer();
rows.sort_unstable_by(|a, b| a.key.cmp(&b.key));
timer.observe_duration();
Ok(rows)
}
}
......@@ -120,8 +83,10 @@ fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<Block>> {
let mut cursor = Cursor::new(&blob);
let mut blocks = vec![];
let max_pos = blob.len() as u64;
while cursor.position() < max_pos {
let offset = cursor.position();
match u32::consensus_decode(&mut cursor) {
Ok(value) => {
if magic != value {
......@@ -131,6 +96,7 @@ fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<Block>> {
}
Err(_) => break, // EOF
};
let block_size = u32::consensus_decode(&mut cursor).chain_err(|| "no block size")?;
let start = cursor.position();
let end = start + block_size as u64;
......@@ -148,11 +114,14 @@ fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<Block>> {
}
Err(_) => break, // EOF
}
let block: Block = deserialize(&blob[start as usize..end as usize])
.chain_err(|| format!("failed to parse block at {}..{}", start, end))?;
blocks.push(block);
cursor.set_position(end as u64);
}
Ok(blocks)
}
......@@ -225,22 +194,28 @@ fn start_indexer(
pub fn index_blk_files(
daemon: &Daemon,
index_threads: usize,
metrics: &Metrics,
signal: &Waiter,
store: DBStore,
) -> Result<DBStore> {
set_open_files_limit(2048); // twice the default `ulimit -n` value
let blk_files = daemon.list_blk_files()?;
info!("indexing {} blk*.dat files", blk_files.len());
let indexed_blockhashes = read_indexed_blockhashes(&store);
debug!("found {} indexed blocks", indexed_blockhashes.len());
let parser = Parser::new(daemon, metrics, indexed_blockhashes)?;
let parser = Parser::new(daemon, indexed_blockhashes)?;
let (blobs, reader) = start_reader(blk_files, parser.clone());
let rows_chan = SyncChannel::new(0);
let indexers: Vec<JoinHandle> = (0..index_threads)
.map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender()))
.collect();
let signal = signal.clone();
spawn_thread("bulk_writer", move || -> Result<DBStore> {
for (rows, path) in rows_chan.into_receiver() {
trace!("indexed {:?}: {} rows", path, rows.len());
......@@ -249,6 +224,7 @@ pub fn index_blk_files(
.poll()
.chain_err(|| "stopping bulk indexing due to signal")?;
}
reader
.join()
.expect("reader panicked")
......@@ -259,6 +235,7 @@ pub fn index_blk_files(
.expect("indexer panicked")
.expect("indexing failed")
});
store.write(vec![parser.last_indexed_row()]);
Ok(store)
})
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment