Unverified Commit c41bc378 authored by Roman Zeyde's avatar Roman Zeyde
Browse files

Add byte-size limit to transaction & block txids' caches

Also, move TransactionCache into `cache` module.

Following https://github.com/romanz/electrs/pull/161

 by @dagurval.
Co-authored-by: default avatarDagur Valberg Johannsson <dagurval@pvv.ntnu.no>
parent 61a5af3a
......@@ -74,16 +74,16 @@ doc = "Number of threads used for bulk indexing (default: use the # of CPUs)"
default = "0"
[[param]]
name = "tx_cache_size"
type = "usize"
doc = "Number of transactions to keep in for query LRU cache"
default = "10000"
name = "tx_cache_size_mb"
type = "f32"
doc = "Total size of transactions to cache (MB)"
default = "10.0"
[[param]]
name = "blocktxids_cache_size"
type = "usize"
doc = "Number of blocks to cache transactions IDs in LRU cache"
default = "100"
name = "blocktxids_cache_size_mb"
type = "f32"
doc = "Total size of block transactions IDs to cache (in MB)"
default = "10.0"
[[param]]
name = "txid_limit"
......
......@@ -12,13 +12,13 @@ use std::time::Duration;
use electrs::{
app::App,
bulk,
cache::BlockTxIDsCache,
cache::{BlockTxIDsCache, TransactionCache},
config::Config,
daemon::Daemon,
errors::*,
index::Index,
metrics::Metrics,
query::{Query, TransactionCache},
query::Query,
rpc::RPC,
signal::Waiter,
store::{full_compaction, is_fully_compacted, DBStore},
......@@ -58,7 +58,7 @@ fn run_server(config: &Config) -> Result<()> {
.enable_compaction(); // enable auto compactions before starting incremental index updates.
let app = App::new(store, index, daemon, &config)?;
let tx_cache = TransactionCache::new(config.tx_cache_size);
let tx_cache = TransactionCache::new(config.tx_cache_size, &metrics);
let query = Query::new(app.clone(), &metrics, tx_cache, config.txid_limit);
let mut server = None; // Electrum RPC server
......
use crate::errors::*;
use crate::metrics::{Counter, MetricOpts, Metrics};
use crate::metrics::{CounterVec, MetricOpts, Metrics};
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::consensus::encode::deserialize;
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use lru::LruCache;
use std::hash::Hash;
use std::sync::Mutex;
struct SizedLruCache<K, V> {
map: LruCache<K, (V, usize)>,
bytes_usage: usize,
bytes_capacity: usize,
lookups: CounterVec,
}
impl<K: Hash + Eq, V> SizedLruCache<K, V> {
fn new(bytes_capacity: usize, lookups: CounterVec) -> SizedLruCache<K, V> {
SizedLruCache {
map: LruCache::unbounded(),
bytes_usage: 0,
bytes_capacity,
lookups,
}
}
fn get(&mut self, key: &K) -> Option<&V> {
match self.map.get(key) {
None => {
self.lookups.with_label_values(&["miss"]).inc();
None
}
Some((value, _)) => {
self.lookups.with_label_values(&["hit"]).inc();
Some(value)
}
}
}
fn put(&mut self, key: K, value: V, byte_size: usize) {
if byte_size > self.bytes_capacity {
return;
}
if let Some((_, popped_size)) = self.map.put(key, (value, byte_size)) {
self.bytes_usage -= popped_size
}
self.bytes_usage += byte_size;
while self.bytes_usage > self.bytes_capacity {
match self.map.pop_lru() {
Some((_, (_, popped_size))) => self.bytes_usage -= popped_size,
None => return,
}
}
}
}
pub struct BlockTxIDsCache {
map: Mutex<LruCache<Sha256dHash /* blockhash */, Vec<Sha256dHash /* txid */>>>,
hits: Counter,
misses: Counter,
map: Mutex<SizedLruCache<Sha256dHash /* blockhash */, Vec<Sha256dHash /* txid */>>>,
}
impl BlockTxIDsCache {
pub fn new(capacity: usize, metrics: &Metrics) -> BlockTxIDsCache {
pub fn new(bytes_capacity: usize, metrics: &Metrics) -> BlockTxIDsCache {
let lookups = metrics.counter_vec(
MetricOpts::new(
"electrs_blocktxids_cache",
"# of cache lookups for list of transactions in a block",
),
&["type"],
);
BlockTxIDsCache {
map: Mutex::new(LruCache::new(capacity)),
hits: metrics.counter(MetricOpts::new(
"electrs_blocktxids_cache_hits",
"# of cache hits for list of transactions in a block",
)),
misses: metrics.counter(MetricOpts::new(
"electrs_blocktxids_cache_misses",
"# of cache misses for list of transactions in a block",
)),
map: Mutex::new(SizedLruCache::new(bytes_capacity, lookups)),
}
}
......@@ -34,29 +83,111 @@ impl BlockTxIDsCache {
F: FnOnce() -> Result<Vec<Sha256dHash>>,
{
if let Some(txids) = self.map.lock().unwrap().get(blockhash) {
self.hits.inc();
return Ok(txids.clone());
}
self.misses.inc();
let txids = load_txids_func()?;
self.map.lock().unwrap().put(*blockhash, txids.clone());
let byte_size = 32 /* hash size */ * (1 /* key */ + txids.len() /* values */);
self.map
.lock()
.unwrap()
.put(*blockhash, txids.clone(), byte_size);
Ok(txids)
}
}
pub struct TransactionCache {
// Store serialized transaction (should use less RAM).
map: Mutex<SizedLruCache<Sha256dHash, Vec<u8>>>,
}
impl TransactionCache {
pub fn new(bytes_capacity: usize, metrics: &Metrics) -> TransactionCache {
let lookups = metrics.counter_vec(
MetricOpts::new(
"electrs_transactions_cache",
"# of cache lookups for transactions",
),
&["type"],
);
TransactionCache {
map: Mutex::new(SizedLruCache::new(bytes_capacity, lookups)),
}
}
pub fn get_or_else<F>(&self, txid: &Sha256dHash, load_txn_func: F) -> Result<Transaction>
where
F: FnOnce() -> Result<Vec<u8>>,
{
match self.map.lock().unwrap().get(txid) {
Some(serialized_txn) => {
return Ok(deserialize(&serialized_txn).chain_err(|| "failed to parse cached tx")?);
}
None => {}
}
let serialized_txn = load_txn_func()?;
let txn = deserialize(&serialized_txn).chain_err(|| "failed to parse serialized tx")?;
let byte_size = 32 /* key (hash size) */ + serialized_txn.len();
self.map
.lock()
.unwrap()
.put(*txid, serialized_txn, byte_size);
Ok(txn)
}
}
#[cfg(test)]
mod tests {
use super::*;
use bitcoin_hashes::Hash;
#[test]
fn test_sized_lru_cache_hit_and_miss() {
let counter = CounterVec::new(prometheus::Opts::new("name", "help"), &["type"]).unwrap();
let mut cache = SizedLruCache::<i8, i32>::new(100, counter.clone());
assert_eq!(counter.with_label_values(&["miss"]).get(), 0);
assert_eq!(counter.with_label_values(&["hit"]).get(), 0);
assert_eq!(cache.get(&1), None); // no such key
assert_eq!(counter.with_label_values(&["miss"]).get(), 1);
assert_eq!(counter.with_label_values(&["hit"]).get(), 0);
cache.put(1, 10, 50); // add new key-value
assert_eq!(cache.get(&1), Some(&10));
assert_eq!(counter.with_label_values(&["miss"]).get(), 1);
assert_eq!(counter.with_label_values(&["hit"]).get(), 1);
cache.put(3, 30, 50); // drop oldest key (1)
cache.put(2, 20, 50);
assert_eq!(cache.get(&1), None);
assert_eq!(cache.get(&2), Some(&20));
assert_eq!(cache.get(&3), Some(&30));
assert_eq!(counter.with_label_values(&["miss"]).get(), 2);
assert_eq!(counter.with_label_values(&["hit"]).get(), 3);
cache.put(3, 33, 50); // replace existing value
assert_eq!(cache.get(&1), None);
assert_eq!(cache.get(&2), Some(&20));
assert_eq!(cache.get(&3), Some(&33));
assert_eq!(counter.with_label_values(&["miss"]).get(), 3);
assert_eq!(counter.with_label_values(&["hit"]).get(), 5);
cache.put(9, 90, 9999); // larger than cache capacity, don't drop the cache
assert_eq!(cache.get(&1), None);
assert_eq!(cache.get(&2), Some(&20));
assert_eq!(cache.get(&3), Some(&33));
assert_eq!(cache.get(&9), None);
assert_eq!(counter.with_label_values(&["miss"]).get(), 5);
assert_eq!(counter.with_label_values(&["hit"]).get(), 7);
}
fn gen_hash(seed: u8) -> Sha256dHash {
let bytes: Vec<u8> = (seed..seed + 32).collect();
Sha256dHash::hash(&bytes[..])
}
#[test]
fn test_cache_hit_and_miss() {
fn test_blocktxids_cache_hit_and_miss() {
let block1 = gen_hash(1);
let block2 = gen_hash(2);
let block3 = gen_hash(3);
......@@ -69,7 +200,8 @@ mod tests {
};
let dummy_metrics = Metrics::new("127.0.0.1:60000".parse().unwrap());
let cache = BlockTxIDsCache::new(2, &dummy_metrics);
// 200 bytes ~ 32 (bytes/hash) * (1 key hash + 2 value hashes) * 2 txns
let cache = BlockTxIDsCache::new(200, &dummy_metrics);
// cache miss
let result = cache.get_or_else(&block1, &miss_func).unwrap();
......@@ -81,7 +213,7 @@ mod tests {
assert_eq!(1, *misses.lock().unwrap());
assert_eq!(txids, result);
// cache size is 2, test that blockhash1 falls out of cache
// cache size is 200, test that blockhash1 falls out of cache
cache.get_or_else(&block2, &miss_func).unwrap();
assert_eq!(2, *misses.lock().unwrap());
cache.get_or_else(&block3, &miss_func).unwrap();
......@@ -94,4 +226,36 @@ mod tests {
cache.get_or_else(&block1, &miss_func).unwrap();
assert_eq!(4, *misses.lock().unwrap());
}
#[test]
fn test_txn_cache() {
use bitcoin::util::hash::BitcoinHash;
use hex;
let dummy_metrics = Metrics::new("127.0.0.1:60000".parse().unwrap());
let cache = TransactionCache::new(1024, &dummy_metrics);
let tx_bytes = hex::decode("0100000001a15d57094aa7a21a28cb20b59aab8fc7d1149a3bdbcddba9c622e4f5f6a99ece010000006c493046022100f93bb0e7d8db7bd46e40132d1f8242026e045f03a0efe71bbb8e3f475e970d790221009337cd7f1f929f00cc6ff01f03729b069a7c21b59b1736ddfee5db5946c5da8c0121033b9b137ee87d5a812d6f506efdd37f0affa7ffc310711c06c7f3e097c9447c52ffffffff0100e1f505000000001976a9140389035a9225b3839e2bbf32d826a1e222031fd888ac00000000").unwrap();
let tx: Transaction = deserialize(&tx_bytes).unwrap();
let txid = tx.bitcoin_hash();
let mut misses = 0;
assert_eq!(
cache
.get_or_else(&txid, || {
misses += 1;
Ok(tx_bytes.clone())
})
.unwrap(),
tx
);
assert_eq!(misses, 1);
assert_eq!(
cache
.get_or_else(&txid, || panic!("should not be called"))
.unwrap(),
tx
);
assert_eq!(misses, 1);
}
}
......@@ -234,6 +234,7 @@ impl Config {
if config.bulk_index_threads == 0 {
config.bulk_index_threads = num_cpus::get();
}
const MB: f32 = (1 << 20) as f32;
let config = Config {
log,
network_type: config.network,
......@@ -246,8 +247,8 @@ impl Config {
jsonrpc_import: config.jsonrpc_import,
index_batch_size: config.index_batch_size,
bulk_index_threads: config.bulk_index_threads,
tx_cache_size: config.tx_cache_size,
blocktxids_cache_size: config.blocktxids_cache_size,
tx_cache_size: (config.tx_cache_size_mb * MB) as usize,
blocktxids_cache_size: (config.blocktxids_cache_size_mb * MB) as usize,
txid_limit: config.txid_limit,
server_banner: config.server_banner,
};
......
......@@ -5,12 +5,12 @@ use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use bitcoin_hashes::Hash;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use lru::LruCache;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};
use crate::app::App;
use crate::cache::TransactionCache;
use crate::errors::*;
use crate::index::{compute_script_hash, TxInRow, TxOutRow, TxRow};
use crate::mempool::Tracker;
......@@ -177,30 +177,6 @@ fn txids_by_funding_output(
.collect()
}
pub struct TransactionCache {
map: Mutex<LruCache<Sha256dHash, Transaction>>,
}
impl TransactionCache {
pub fn new(capacity: usize) -> TransactionCache {
TransactionCache {
map: Mutex::new(LruCache::new(capacity)),
}
}
fn get_or_else<F>(&self, txid: &Sha256dHash, load_txn_func: F) -> Result<Transaction>
where
F: FnOnce() -> Result<Transaction>,
{
if let Some(txn) = self.map.lock().unwrap().get(txid) {
return Ok(txn.clone());
}
let txn = load_txn_func()?;
self.map.lock().unwrap().put(*txid, txn.clone());
Ok(txn)
}
}
pub struct Query {
app: Arc<App>,
tracker: RwLock<Tracker>,
......@@ -378,7 +354,12 @@ impl Query {
fn load_txn(&self, txid: &Sha256dHash, block_height: Option<u32>) -> Result<Transaction> {
self.tx_cache.get_or_else(&txid, || {
let blockhash = self.lookup_confirmed_blockhash(txid, block_height)?;
self.app.daemon().gettransaction(txid, blockhash)
let value: Value = self
.app
.daemon()
.gettransaction_raw(txid, blockhash, /*verbose*/ false)?;
let value_hex: &str = value.as_str().chain_err(|| "non-string tx")?;
hex::decode(&value_hex).chain_err(|| "non-hex tx")
})
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment