Unverified Commit b69219a5 authored by Roman Zeyde's avatar Roman Zeyde
Browse files

Pass application state and query via Arc

(consider Tokio futures)
parent 375bc030
use argparse::{ArgumentParser, StoreTrue};
use bitcoin::util::hash::Sha256dHash;
use crossbeam;
use std::fs::OpenOptions;
use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
......@@ -49,6 +51,30 @@ impl Config {
}
}
pub struct App {
store: store::DBStore,
index: index::Index,
daemon: daemon::Daemon,
}
impl App {
pub fn store(&self) -> &store::DBStore {
&self.store
}
pub fn index(&self) -> &index::Index {
&self.index
}
pub fn daemon(&self) -> &daemon::Daemon {
&self.daemon
}
fn update_index(&self, mut tip: Sha256dHash) -> Sha256dHash {
if tip != self.daemon.getbestblockhash().unwrap() {
tip = self.index.update(&self.store, &self.daemon);
}
tip
}
}
fn run_server(config: &Config) {
let index = index::Index::new();
let daemon = daemon::Daemon::new(config.network_type);
......@@ -65,17 +91,20 @@ fn run_server(config: &Config) {
drop(store); // to be re-opened soon
let store = store::DBStore::open(config.db_path, store::StoreOptions { auto_compact: true });
let query = query::Query::new(&store, &daemon, &index);
let app = Arc::new(App {
store,
index,
daemon,
});
let query = Arc::new(query::Query::new(app.clone()));
crossbeam::scope(|scope| {
let poll_delay = Duration::from_secs(5);
scope.spawn(|| rpc::serve(&config.rpc_addr, &query));
scope.spawn(|| rpc::serve(&config.rpc_addr, query.clone()));
loop {
thread::sleep(poll_delay);
query.update_mempool().unwrap();
if tip != daemon.getbestblockhash().unwrap() {
tip = index.update(&store, &daemon);
}
tip = app.update_index(tip);
}
});
}
......
......@@ -5,10 +5,10 @@ use bitcoin::util::hash::Sha256dHash;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use std::collections::HashMap;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use daemon::Daemon;
use index::{compute_script_hash, Index, TxInRow, TxOutRow, TxRow};
use app::App;
use index::{compute_script_hash, TxInRow, TxOutRow, TxRow};
use mempool::Tracker;
use store::ReadStore;
use util::{FullHash, HashPrefix, HeaderEntry};
......@@ -125,27 +125,19 @@ fn txids_by_funding_output(
.collect()
}
pub struct Query<'a> {
daemon: &'a Daemon,
index: &'a Index,
index_store: &'a ReadStore, // TODO: should be a part of index
pub struct Query {
app: Arc<App>,
tracker: RwLock<Tracker>,
}
impl<'a> Query<'a> {
pub fn new(index_store: &'a ReadStore, daemon: &'a Daemon, index: &'a Index) -> Query<'a> {
impl Query {
pub fn new(app: Arc<App>) -> Query {
Query {
daemon,
index,
index_store,
app,
tracker: RwLock::new(Tracker::new()),
}
}
pub fn daemon(&self) -> &Daemon {
self.daemon
}
fn load_txns(&self, store: &ReadStore, prefixes: Vec<HashPrefix>) -> Vec<TxnHeight> {
let mut txns = Vec::new();
for txid_prefix in prefixes {
......@@ -212,13 +204,13 @@ impl<'a> Query<'a> {
let mut funding = vec![];
let mut spending = vec![];
for t in self.load_txns(
self.index_store,
txids_by_script_hash(self.index_store, script_hash),
self.app.store(),
txids_by_script_hash(self.app.store(), script_hash),
) {
funding.extend(self.find_funding_outputs(&t, script_hash));
}
for funding_output in &funding {
if let Some(spent) = self.find_spending_input(self.index_store, &funding_output) {
if let Some(spent) = self.find_spending_input(self.app.store(), &funding_output) {
spending.push(spent);
}
}
......@@ -255,13 +247,14 @@ impl<'a> Query<'a> {
}
pub fn get_tx(&self, tx_hash: &Sha256dHash) -> Transaction {
self.daemon
self.app
.daemon()
.gettransaction(tx_hash)
.expect(&format!("failed to load tx {}", tx_hash))
}
pub fn get_headers(&self, heights: &[usize]) -> Vec<BlockHeader> {
let headers_list = self.index.headers_list();
let headers_list = self.app.index().headers_list();
let headers = headers_list.headers();
let mut result = Vec::new();
for height in heights {
......@@ -275,7 +268,7 @@ impl<'a> Query<'a> {
}
pub fn get_best_header(&self) -> Option<HeaderEntry> {
let header_list = self.index.headers_list();
let header_list = self.app.index().headers_list();
Some(header_list.headers().last()?.clone())
}
......@@ -284,9 +277,9 @@ impl<'a> Query<'a> {
tx_hash: &Sha256dHash,
height: usize,
) -> Option<(Vec<Sha256dHash>, usize)> {
let header_list = self.index.headers_list();
let header_list = self.app.index().headers_list();
let blockhash = header_list.headers().get(height)?.hash();
let block: Block = self.daemon.getblock(&blockhash).unwrap();
let block: Block = self.app.daemon().getblock(&blockhash).unwrap();
let mut txids: Vec<Sha256dHash> = block.txdata.iter().map(|tx| tx.txid()).collect();
let pos = txids.iter().position(|txid| txid == tx_hash)?;
let mut merkle = Vec::new();
......@@ -307,11 +300,18 @@ impl<'a> Query<'a> {
Some((merkle, pos))
}
pub fn broadcast(&self, txn: &Transaction) -> Result<Sha256dHash> {
self.app
.daemon()
.broadcast(txn)
.chain_err(|| "broadcast failed")
}
pub fn update_mempool(&self) -> Result<()> {
self.tracker
.write()
.unwrap()
.update(self.daemon)
.update(self.app.daemon())
.chain_err(|| "failed to update mempool")
}
......
......@@ -8,6 +8,7 @@ use serde_json::{from_str, Number, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender};
use std::time::Duration;
......@@ -36,16 +37,16 @@ fn jsonify_header(header: &BlockHeader, height: usize) -> Value {
})
}
struct Connection<'a> {
query: &'a Query<'a>,
struct Connection {
query: Arc<Query>,
last_header_entry: Option<HeaderEntry>,
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
}
impl<'a> Connection<'a> {
pub fn new(query: &'a Query, stream: TcpStream, addr: SocketAddr) -> Connection<'a> {
impl Connection {
pub fn new(query: Arc<Query>, stream: TcpStream, addr: SocketAddr) -> Connection {
Connection {
query: query,
last_header_entry: None, // disable header subscription for now
......@@ -147,10 +148,7 @@ impl<'a> Connection<'a> {
let tx = tx.as_str().chain_err(|| "non-string tx")?;
let tx = hex::decode(&tx).chain_err(|| "non-hex tx")?;
let tx: Transaction = deserialize(&tx).chain_err(|| "failed to parse tx")?;
let txid = self.query
.daemon()
.broadcast(&tx)
.chain_err(|| "broadcast failed")?;
let txid = self.query.broadcast(&tx).chain_err(|| "broadcast failed")?;
Ok(json!(txid.be_hex_string()))
}
......@@ -331,13 +329,13 @@ impl Channel {
}
}
pub fn serve(addr: &SocketAddr, query: &Query) {
pub fn serve(addr: &SocketAddr, query: Arc<Query>) {
let listener = TcpListener::bind(addr).unwrap();
info!("RPC server running on {}", addr);
loop {
let (stream, addr) = listener.accept().unwrap();
info!("[{}] connected peer", addr);
Connection::new(query, stream, addr).run();
Connection::new(query.clone(), stream, addr).run();
info!("[{}] disconnected peer", addr);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment