Commit de6ec832 authored by kenshin-samourai's avatar kenshin-samourai
Browse files

remouve transaction cache

remove calls to bitcoind rpc api from query.js (store output vout in db)
parent 80c10659
......@@ -12,7 +12,7 @@ use std::time::Duration;
use electrs::{
app::App,
bulk,
cache::{BlockTxIDsCache, TransactionCache},
cache::BlockTxIDsCache,
config::Config,
daemon::Daemon,
errors::*,
......@@ -54,8 +54,7 @@ fn run_server(config: &Config) -> Result<()> {
.enable_compaction(); // enable auto compactions before starting incremental index updates.
let app = App::new(store, index, daemon)?;
let tx_cache = TransactionCache::new(config.tx_cache_size);
let query = Query::new(app.clone(), tx_cache, config.txid_limit);
let query = Query::new(app.clone(), config.txid_limit);
let mut server = None; // Electrum RPC server
loop {
......
use crate::errors::*;
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::consensus::encode::deserialize;
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use lru::LruCache;
use std::hash::Hash;
......@@ -80,39 +78,6 @@ impl BlockTxIDsCache {
}
}
pub struct TransactionCache {
// Store serialized transaction (should use less RAM).
map: Mutex<SizedLruCache<Sha256dHash, Vec<u8>>>,
}
impl TransactionCache {
pub fn new(bytes_capacity: usize) -> TransactionCache {
TransactionCache {
map: Mutex::new(SizedLruCache::new(bytes_capacity)),
}
}
pub fn get_or_else<F>(&self, txid: &Sha256dHash, load_txn_func: F) -> Result<Transaction>
where
F: FnOnce() -> Result<Vec<u8>>,
{
match self.map.lock().unwrap().get(txid) {
Some(serialized_txn) => {
return Ok(deserialize(&serialized_txn).chain_err(|| "failed to parse cached tx")?);
}
None => {}
}
let serialized_txn = load_txn_func()?;
let txn = deserialize(&serialized_txn).chain_err(|| "failed to parse serialized tx")?;
let byte_size = 32 /* key (hash size) */ + serialized_txn.len();
self.map
.lock()
.unwrap()
.put(*txid, serialized_txn, byte_size);
Ok(txn)
}
}
#[cfg(test)]
mod tests {
use super::*;
......@@ -189,35 +154,4 @@ mod tests {
cache.get_or_else(&block1, &miss_func).unwrap();
assert_eq!(4, *misses.lock().unwrap());
}
#[test]
fn test_txn_cache() {
use bitcoin::util::hash::BitcoinHash;
use hex;
let cache = TransactionCache::new(1024);
let tx_bytes = hex::decode("0100000001a15d57094aa7a21a28cb20b59aab8fc7d1149a3bdbcddba9c622e4f5f6a99ece010000006c493046022100f93bb0e7d8db7bd46e40132d1f8242026e045f03a0efe71bbb8e3f475e970d790221009337cd7f1f929f00cc6ff01f03729b069a7c21b59b1736ddfee5db5946c5da8c0121033b9b137ee87d5a812d6f506efdd37f0affa7ffc310711c06c7f3e097c9447c52ffffffff0100e1f505000000001976a9140389035a9225b3839e2bbf32d826a1e222031fd888ac00000000").unwrap();
let tx: Transaction = deserialize(&tx_bytes).unwrap();
let txid = tx.bitcoin_hash();
let mut misses = 0;
assert_eq!(
cache
.get_or_else(&txid, || {
misses += 1;
Ok(tx_bytes.clone())
})
.unwrap(),
tx
);
assert_eq!(misses, 1);
assert_eq!(
cache
.get_or_else(&txid, || panic!("should not be called"))
.unwrap(),
tx
);
assert_eq!(misses, 1);
}
}
......@@ -75,16 +75,18 @@ pub struct TxOutKey {
pub struct TxOutRow {
key: TxOutKey,
pub txid_prefix: HashPrefix,
pub vout: u16,
}
impl TxOutRow {
pub fn new(txid: &Sha256dHash, output: &TxOut) -> TxOutRow {
pub fn new(txid: &Sha256dHash, vout: u32, output: &TxOut) -> TxOutRow {
TxOutRow {
key: TxOutKey {
code: b'O',
script_hash_prefix: hash_prefix(&compute_script_hash(&output.script_pubkey[..])),
},
txid_prefix: hash_prefix(&txid[..]),
vout: vout as u16,
}
}
......@@ -181,10 +183,12 @@ pub fn index_transaction<'a>(
Some(TxInRow::new(&txid, &input).to_row())
}
});
let outputs = txn
.output
.iter()
.map(move |output| TxOutRow::new(&txid, &output).to_row());
.enumerate()
.map(move |(vout, output)| TxOutRow::new(&txid, vout as u32, &output).to_row());
// Persist transaction ID and confirmed height
inputs
......
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::consensus::encode::deserialize;
use bitcoin_hashes::hex::ToHex;
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use crate::app::App;
use crate::cache::TransactionCache;
use crate::errors::*;
use crate::index::{compute_script_hash, TxInRow, TxOutRow, TxRow};
use crate::index::{TxInRow, TxOutRow, TxRow};
use crate::mempool::Tracker;
use crate::store::{ReadStore, Row};
use crate::util::{FullHash, HashPrefix, HeaderEntry};
use crate::store::ReadStore;
use crate::util::HashPrefix;
pub struct FundingOutput {
pub txn_id: Sha256dHash,
......@@ -57,37 +52,17 @@ impl Status {
txns.sort_unstable();
txns
}
pub fn hash(&self) -> Option<FullHash> {
let txns = self.history();
if txns.is_empty() {
None
} else {
let mut hash = FullHash::default();
let mut sha2 = Sha256::new();
for (height, txn_id) in txns {
let part = format!("{}:{}:", txn_id.to_hex(), height);
sha2.input(part.as_bytes());
}
sha2.result(&mut hash);
Some(hash)
}
}
}
struct TxnHeight {
txn: Transaction,
txn_id: Sha256dHash,
height: u32,
}
// TODO: the functions below can be part of ReadStore.
fn txrow_by_txid(store: &dyn ReadStore, txid: &Sha256dHash) -> Option<TxRow> {
let key = TxRow::filter_full(&txid);
let value = store.get(&key)?;
Some(TxRow::from_row(&Row { key, value }))
}
fn txrows_by_prefix(store: &dyn ReadStore, txid_prefix: HashPrefix) -> Vec<TxRow> {
fn txrows_by_prefix(
store: &dyn ReadStore,
txid_prefix: HashPrefix
) -> Vec<TxRow> {
store
.scan(&TxRow::filter_prefix(txid_prefix))
.iter()
......@@ -95,11 +70,14 @@ fn txrows_by_prefix(store: &dyn ReadStore, txid_prefix: HashPrefix) -> Vec<TxRow
.collect()
}
fn txids_by_script_hash(store: &dyn ReadStore, script_hash: &[u8]) -> Vec<HashPrefix> {
fn txoutrows_by_script_hash(
store: &dyn ReadStore,
script_hash: &[u8]
) -> Vec<TxOutRow> {
store
.scan(&TxOutRow::filter(script_hash))
.iter()
.map(|row| TxOutRow::from_row(row).txid_prefix)
.map(|row| TxOutRow::from_row(row))
.collect()
}
......@@ -118,20 +96,17 @@ fn txids_by_funding_output(
pub struct Query {
app: Arc<App>,
tracker: RwLock<Tracker>,
tx_cache: TransactionCache,
txid_limit: usize,
}
impl Query {
pub fn new(
app: Arc<App>,
tx_cache: TransactionCache,
txid_limit: usize,
) -> Arc<Query> {
Arc::new(Query {
app,
tracker: RwLock::new(Tracker::new()),
tx_cache,
txid_limit,
})
}
......@@ -145,9 +120,8 @@ impl Query {
for txid_prefix in prefixes {
for tx_row in txrows_by_prefix(store, txid_prefix) {
let txid: Sha256dHash = deserialize(&tx_row.key.txid).unwrap();
let txn = self.load_txn(&txid, Some(tx_row.height))?;
txns.push(TxnHeight {
txn,
txn_id: txid,
height: tx_row.height,
})
}
......@@ -160,28 +134,23 @@ impl Query {
store: &dyn ReadStore,
funding: &FundingOutput,
) -> Result<Option<SpendingInput>> {
let spending_txns: Vec<TxnHeight> = self.load_txns_by_prefix(
store,
txids_by_funding_output(store, &funding.txn_id, funding.output_index),
)?;
let mut spending_inputs = vec![];
let prefixes = txids_by_funding_output(store, &funding.txn_id, funding.output_index);
let spending_txns = self.load_txns_by_prefix(store, prefixes)?;
for t in &spending_txns {
for input in t.txn.input.iter() {
if input.previous_output.txid == funding.txn_id
&& input.previous_output.vout == funding.output_index as u32
{
spending_inputs.push(SpendingInput {
txn_id: t.txn.txid(),
height: t.height,
funding_output: (funding.txn_id, funding.output_index),
})
}
if t.txn_id == funding.txn_id {
spending_inputs.push(SpendingInput {
txn_id: t.txn_id,
height: t.height,
funding_output: (funding.txn_id, funding.output_index),
})
}
}
assert!(spending_inputs.len() <= 1);
Ok(if spending_inputs.len() == 1 {
Some(spending_inputs.remove(0))
} else {
......@@ -189,19 +158,27 @@ impl Query {
})
}
fn find_funding_outputs(&self, t: &TxnHeight, script_hash: &[u8]) -> Vec<FundingOutput> {
fn find_funding_outputs(
&self,
store: &dyn ReadStore,
script_hash: &[u8]
) -> Result<Vec<FundingOutput>> {
let txout_rows = txoutrows_by_script_hash(store, script_hash);
let mut result = vec![];
let txn_id = t.txn.txid();
for (index, output) in t.txn.output.iter().enumerate() {
if compute_script_hash(&output.script_pubkey[..]) == script_hash {
for row in &txout_rows {
let funding_txns = self.load_txns_by_prefix(store, vec![row.txid_prefix])?;
for t in &funding_txns {
result.push(FundingOutput {
txn_id,
txn_id: t.txn_id,
height: t.height,
output_index: index,
output_index: row.vout as usize,
})
}
}
result
Ok(result)
}
fn confirmed_status(
......@@ -211,22 +188,16 @@ impl Query {
let mut funding = vec![];
let mut spending = vec![];
let read_store = self.app.read_store();
let txid_prefixes = txids_by_script_hash(read_store, script_hash);
// if the limit is enabled
if self.txid_limit > 0 && txid_prefixes.len() > self.txid_limit {
bail!(
"{}+ transactions found, query may take a long time",
txid_prefixes.len()
);
}
for t in self.load_txns_by_prefix(read_store, txid_prefixes)? {
funding.extend(self.find_funding_outputs(&t, script_hash));
}
let funding_outputs = self.find_funding_outputs(read_store, script_hash)?;
funding.extend(funding_outputs);
for funding_output in &funding {
if let Some(spent) = self.find_spending_input(read_store, &funding_output)? {
spending.push(spent);
}
}
Ok((funding, spending))
}
......@@ -238,16 +209,16 @@ impl Query {
let mut funding = vec![];
let mut spending = vec![];
let tracker = self.tracker.read().unwrap();
let txid_prefixes = txids_by_script_hash(tracker.index(), script_hash);
for t in self.load_txns_by_prefix(tracker.index(), txid_prefixes)? {
funding.extend(self.find_funding_outputs(&t, script_hash));
}
// // TODO: dedup outputs (somehow) both confirmed and in mempool (e.g. reorg?)
let funding_outputs = self.find_funding_outputs(tracker.index(), script_hash)?;
funding.extend(funding_outputs);
for funding_output in funding.iter().chain(confirmed_funding.iter()) {
if let Some(spent) = self.find_spending_input(tracker.index(), &funding_output)? {
spending.push(spent);
}
}
Ok((funding, spending))
}
......@@ -263,46 +234,6 @@ impl Query {
Ok(Status { confirmed, mempool })
}
fn lookup_confirmed_blockhash(
&self,
tx_hash: &Sha256dHash,
block_height: Option<u32>,
) -> Result<Option<Sha256dHash>> {
let blockhash = if self.tracker.read().unwrap().get_txn(&tx_hash).is_some() {
None // found in mempool (as unconfirmed transaction)
} else {
// Lookup in confirmed transactions' index
let height = match block_height {
Some(height) => height,
None => {
txrow_by_txid(self.app.read_store(), &tx_hash)
.chain_err(|| format!("not indexed tx {}", tx_hash))?
.height
}
};
let header = self
.app
.index()
.get_header(height as usize)
.chain_err(|| format!("missing header at height {}", height))?;
Some(*header.hash())
};
Ok(blockhash)
}
// Internal API for transaction retrieval
fn load_txn(&self, txid: &Sha256dHash, block_height: Option<u32>) -> Result<Transaction> {
self.tx_cache.get_or_else(&txid, || {
let blockhash = self.lookup_confirmed_blockhash(txid, block_height)?;
let value: Value = self
.app
.daemon()
.gettransaction_raw(txid, blockhash, /*verbose*/ false)?;
let value_hex: &str = value.as_str().chain_err(|| "non-string tx")?;
hex::decode(&value_hex).chain_err(|| "non-hex tx")
})
}
pub fn update_mempool(&self) -> Result<()> {
self.tracker.write().unwrap().update(self.app.daemon())
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment