index.rs 10.5 KB
Newer Older
1
use bincode;
2
use bitcoin::blockdata::block::{Block, BlockHeader};
3
use bitcoin::blockdata::transaction::{Transaction, TxIn, TxOut};
4 5
use bitcoin::consensus::encode::{deserialize, serialize};
use bitcoin::util::hash::BitcoinHash;
Roman Zeyde's avatar
Roman Zeyde committed
6
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
7 8
use crypto::digest::Digest;
use crypto::sha2::Sha256;
kenshin-samourai's avatar
kenshin-samourai committed
9
use std::collections::HashSet;
10
use std::sync::RwLock;
11

Roman Zeyde's avatar
Roman Zeyde committed
12 13 14 15 16
use crate::daemon::Daemon;
use crate::errors::*;
use crate::signal::Waiter;
use crate::store::{ReadStore, Row, WriteStore};
use crate::util::{
kenshin-samourai's avatar
kenshin-samourai committed
17 18
    full_hash, hash_prefix, spawn_thread, Bytes,
    FullHash, HashPrefix, HeaderEntry, HeaderList,
Roman Zeyde's avatar
Roman Zeyde committed
19 20
    HeaderMap, SyncChannel, HASH_PREFIX_LEN,
};
21

kenshin-samourai's avatar
kenshin-samourai committed
22 23 24
//
// Key of a row storing an input of a transaction
//
25
#[derive(Serialize, Deserialize)]
26 27
pub struct TxInKey {
    pub code: u8,
kenshin-samourai's avatar
kenshin-samourai committed
28 29
    pub prev_txid_prefix: HashPrefix,
    pub prev_vout: u16,
30 31
}

kenshin-samourai's avatar
kenshin-samourai committed
32 33 34
//
// Row storing an input of a transaction
//
35
#[derive(Serialize, Deserialize)]
36
pub struct TxInRow {
37
    key: TxInKey,
38
    pub txid_prefix: HashPrefix,
39 40
}

41 42 43 44 45
impl TxInRow {
    pub fn new(txid: &Sha256dHash, input: &TxIn) -> TxInRow {
        TxInRow {
            key: TxInKey {
                code: b'I',
kenshin-samourai's avatar
kenshin-samourai committed
46 47
                prev_txid_prefix: hash_prefix(&input.previous_output.txid[..]),
                prev_vout: input.previous_output.vout as u16,
48 49 50 51 52
            },
            txid_prefix: hash_prefix(&txid[..]),
        }
    }

kenshin-samourai's avatar
kenshin-samourai committed
53
    pub fn filter(txid: &Sha256dHash, vout: usize) -> Bytes {
54 55
        bincode::serialize(&TxInKey {
            code: b'I',
kenshin-samourai's avatar
kenshin-samourai committed
56 57
            prev_txid_prefix: hash_prefix(&txid[..]),
            prev_vout: vout as u16,
Roman Zeyde's avatar
Roman Zeyde committed
58 59
        })
        .unwrap()
60 61 62 63 64 65 66 67 68 69 70 71 72 73
    }

    pub fn to_row(&self) -> Row {
        Row {
            key: bincode::serialize(&self).unwrap(),
            value: vec![],
        }
    }

    pub fn from_row(row: &Row) -> TxInRow {
        bincode::deserialize(&row.key).expect("failed to parse TxInRow")
    }
}

kenshin-samourai's avatar
kenshin-samourai committed
74 75 76
//
// Key of a row storing an output of a transaction
//
77
#[derive(Serialize, Deserialize)]
78
pub struct TxOutKey {
79
    code: u8,
80
    script_hash_prefix: HashPrefix,
81 82
}

kenshin-samourai's avatar
kenshin-samourai committed
83 84 85
//
// Row storing an output of a transaction
//
86
#[derive(Serialize, Deserialize)]
87
pub struct TxOutRow {
88
    key: TxOutKey,
89
    pub txid_prefix: HashPrefix,
kenshin-samourai's avatar
kenshin-samourai committed
90
    pub vout: u16,
91 92
}

93
impl TxOutRow {
kenshin-samourai's avatar
kenshin-samourai committed
94
    pub fn new(txid: &Sha256dHash, vout: u32, output: &TxOut) -> TxOutRow {
95 96 97 98 99 100
        TxOutRow {
            key: TxOutKey {
                code: b'O',
                script_hash_prefix: hash_prefix(&compute_script_hash(&output.script_pubkey[..])),
            },
            txid_prefix: hash_prefix(&txid[..]),
kenshin-samourai's avatar
kenshin-samourai committed
101
            vout: vout as u16,
102 103 104 105 106 107 108
        }
    }

    pub fn filter(script_hash: &[u8]) -> Bytes {
        bincode::serialize(&TxOutKey {
            code: b'O',
            script_hash_prefix: hash_prefix(&script_hash[..HASH_PREFIX_LEN]),
Roman Zeyde's avatar
Roman Zeyde committed
109 110
        })
        .unwrap()
111 112 113 114 115 116 117 118 119 120 121 122 123 124
    }

    pub fn to_row(&self) -> Row {
        Row {
            key: bincode::serialize(&self).unwrap(),
            value: vec![],
        }
    }

    pub fn from_row(row: &Row) -> TxOutRow {
        bincode::deserialize(&row.key).expect("failed to parse TxOutRow")
    }
}

kenshin-samourai's avatar
kenshin-samourai committed
125 126 127
//
// Key of a row storing a transaction
//
128
#[derive(Serialize, Deserialize)]
129
pub struct TxKey {
130
    code: u8,
131
    pub txid: FullHash,
132 133
}

kenshin-samourai's avatar
kenshin-samourai committed
134 135 136
//
// Row storing a transaction
//
kenshin-samourai's avatar
kenshin-samourai committed
137
#[derive(Serialize, Deserialize)]
138 139 140 141 142
pub struct TxRow {
    pub key: TxKey,
}

impl TxRow {
kenshin-samourai's avatar
kenshin-samourai committed
143
    pub fn new(txid: &Sha256dHash) -> TxRow {
144 145 146 147 148 149 150 151
        TxRow {
            key: TxKey {
                code: b'T',
                txid: full_hash(&txid[..]),
            },
        }
    }

Roman Zeyde's avatar
Roman Zeyde committed
152
    pub fn filter_prefix(txid_prefix: HashPrefix) -> Bytes {
153 154 155
        [b"T", &txid_prefix[..]].concat()
    }

156 157 158 159
    pub fn filter_full(txid: &Sha256dHash) -> Bytes {
        [b"T", &txid[..]].concat()
    }

160 161
    pub fn to_row(&self) -> Row {
        Row {
kenshin-samourai's avatar
kenshin-samourai committed
162 163
            key: bincode::serialize(&self).unwrap(),
            value: vec![],
164 165 166 167
        }
    }

    pub fn from_row(row: &Row) -> TxRow {
kenshin-samourai's avatar
kenshin-samourai committed
168
        bincode::deserialize(&row.key).expect("failed to parse TxRow")
169 170 171
    }
}

kenshin-samourai's avatar
kenshin-samourai committed
172 173 174
//
// Key of a row storing a block
//
175
#[derive(Serialize, Deserialize)]
Roman Zeyde's avatar
Roman Zeyde committed
176
struct BlockKey {
177 178 179 180
    code: u8,
    hash: FullHash,
}

kenshin-samourai's avatar
kenshin-samourai committed
181 182 183
//
// Compute the script hash of a scriptpubkey
//
184
pub fn compute_script_hash(data: &[u8]) -> FullHash {
185
    let mut hash = FullHash::default();
186 187 188
    let mut sha2 = Sha256::new();
    sha2.input(data);
    sha2.result(&mut hash);
189
    hash
190 191
}

kenshin-samourai's avatar
kenshin-samourai committed
192 193 194
//
// Index a transaction
//
kenshin-samourai's avatar
kenshin-samourai committed
195
pub fn index_transaction<'a>(txn: &'a Transaction) -> impl 'a + Iterator<Item = Row> {
196 197
    let null_hash = Sha256dHash::default();
    let txid: Sha256dHash = txn.txid();
198 199

    let inputs = txn.input.iter().filter_map(move |input| {
200
        if input.previous_output.txid == null_hash {
201 202 203
            None
        } else {
            Some(TxInRow::new(&txid, &input).to_row())
204
        }
205
    });
kenshin-samourai's avatar
kenshin-samourai committed
206

Roman Zeyde's avatar
Roman Zeyde committed
207 208 209
    let outputs = txn
        .output
        .iter()
kenshin-samourai's avatar
kenshin-samourai committed
210 211
        .enumerate()
        .map(move |(vout, output)| TxOutRow::new(&txid, vout as u32, &output).to_row());
212

Roman Zeyde's avatar
Roman Zeyde committed
213 214
    inputs
        .chain(outputs)
kenshin-samourai's avatar
kenshin-samourai committed
215
        .chain(std::iter::once(TxRow::new(&txid).to_row()))
216 217
}

kenshin-samourai's avatar
kenshin-samourai committed
218 219 220
//
// Index a block
//
kenshin-samourai's avatar
kenshin-samourai committed
221
pub fn index_block<'a>(block: &'a Block) -> impl 'a + Iterator<Item = Row> {
222
    let blockhash = block.bitcoin_hash();
223
    // Persist block hash and header
224
    let row = Row {
225 226 227
        key: bincode::serialize(&BlockKey {
            code: b'B',
            hash: full_hash(&blockhash[..]),
Roman Zeyde's avatar
Roman Zeyde committed
228 229
        })
        .unwrap(),
230
        value: serialize(&block.header),
231
    };
Roman Zeyde's avatar
Roman Zeyde committed
232 233 234
    block
        .txdata
        .iter()
kenshin-samourai's avatar
kenshin-samourai committed
235
        .flat_map(move |txn| index_transaction(&txn))
Roman Zeyde's avatar
Roman Zeyde committed
236
        .chain(std::iter::once(row))
237 238
}

kenshin-samourai's avatar
kenshin-samourai committed
239 240 241
//
// Retrieve the last indexed block
//
242
pub fn last_indexed_block(blockhash: &Sha256dHash) -> Row {
243
    // Store last indexed block (i.e. all previous blocks were indexed)
244
    Row {
245
        key: b"L".to_vec(),
246
        value: serialize(blockhash),
247
    }
248 249
}

kenshin-samourai's avatar
kenshin-samourai committed
250 251 252
//
// Retrieve the hashes of all the indexed blocks
//
253
pub fn read_indexed_blockhashes(store: &dyn ReadStore) -> HashSet<Sha256dHash> {
254 255 256 257 258 259 260 261
    let mut result = HashSet::new();
    for row in store.scan(b"B") {
        let key: BlockKey = bincode::deserialize(&row.key).unwrap();
        result.insert(deserialize(&key.hash).unwrap());
    }
    result
}

kenshin-samourai's avatar
kenshin-samourai committed
262 263 264
//
// Retrieve the headers of all the indexed blocks
//
265
fn read_indexed_headers(store: &dyn ReadStore) -> HeaderList {
266 267 268 269 270
    let latest_blockhash: Sha256dHash = match store.get(b"L") {
        // latest blockheader persisted in the DB.
        Some(row) => deserialize(&row).unwrap(),
        None => Sha256dHash::default(),
    };
271
    trace!("lastest indexed blockhash: {}", latest_blockhash);
kenshin-samourai's avatar
kenshin-samourai committed
272

273
    let mut map = HeaderMap::new();
274 275 276
    for row in store.scan(b"B") {
        let key: BlockKey = bincode::deserialize(&row.key).unwrap();
        let header: BlockHeader = deserialize(&row.value).unwrap();
277
        map.insert(deserialize(&key.hash).unwrap(), header);
278
    }
kenshin-samourai's avatar
kenshin-samourai committed
279

280 281 282
    let mut headers = vec![];
    let null_hash = Sha256dHash::default();
    let mut blockhash = latest_blockhash;
kenshin-samourai's avatar
kenshin-samourai committed
283

284
    while blockhash != null_hash {
Roman Zeyde's avatar
Roman Zeyde committed
285 286
        let header = map
            .remove(&blockhash)
Roman Zeyde's avatar
Roman Zeyde committed
287
            .unwrap_or_else(|| panic!("missing {} header in DB", blockhash));
288 289
        blockhash = header.prev_blockhash;
        headers.push(header);
290
    }
kenshin-samourai's avatar
kenshin-samourai committed
291

292
    headers.reverse();
kenshin-samourai's avatar
kenshin-samourai committed
293

294 295 296 297 298 299 300
    assert_eq!(
        headers
            .first()
            .map(|h| h.prev_blockhash)
            .unwrap_or(null_hash),
        null_hash
    );
kenshin-samourai's avatar
kenshin-samourai committed
301

302 303 304
    assert_eq!(
        headers
            .last()
Roman Zeyde's avatar
Roman Zeyde committed
305
            .map(BitcoinHash::bitcoin_hash)
306 307 308
            .unwrap_or(null_hash),
        latest_blockhash
    );
kenshin-samourai's avatar
kenshin-samourai committed
309

310 311
    let mut result = HeaderList::empty();
    let entries = result.order(headers);
312
    result.apply(entries, latest_blockhash);
313
    result
314 315
}

kenshin-samourai's avatar
kenshin-samourai committed
316 317 318
//
// Indexer
//
319
pub struct Index {
Roman Zeyde's avatar
Roman Zeyde committed
320
    // TODO: store also latest snapshot.
321
    headers: RwLock<HeaderList>,
322
    daemon: Daemon,
323
    batch_size: usize,
324 325 326
}

impl Index {
327
    pub fn load(
328
        store: &dyn ReadStore,
329 330 331
        daemon: &Daemon,
        batch_size: usize,
    ) -> Result<Index> {
332
        let headers = read_indexed_headers(store);
333
        Ok(Index {
334
            headers: RwLock::new(headers),
335
            daemon: daemon.reconnect()?,
336
            batch_size,
337
        })
338 339
    }

340
    pub fn reload(&self, store: &dyn ReadStore) {
341 342 343 344
        let mut headers = self.headers.write().unwrap();
        *headers = read_indexed_headers(store);
    }

345 346 347 348 349
    pub fn best_header(&self) -> Option<HeaderEntry> {
        let headers = self.headers.read().unwrap();
        headers.header_by_blockhash(&headers.tip()).cloned()
    }

350
    pub fn get_header(&self, height: usize) -> Option<HeaderEntry> {
351 352 353 354 355
        self.headers
            .read()
            .unwrap()
            .header_by_height(height)
            .cloned()
356 357
    }

358
    pub fn update(&self, store: &impl WriteStore, waiter: &Waiter) -> Result<Sha256dHash> {
359
        let daemon = self.daemon.reconnect()?;
360
        let tip = daemon.getbestblockhash()?;
361

362
        let new_headers: Vec<HeaderEntry> = {
363 364 365
            let indexed_headers = self.headers.read().unwrap();
            indexed_headers.order(daemon.get_new_headers(&indexed_headers, &tip)?)
        };
366

Roman Zeyde's avatar
Roman Zeyde committed
367 368 369
        if let Some(latest_header) = new_headers.last() {
            info!("{:?} ({} left to index)", latest_header, new_headers.len());
        };
370

371 372
        let chan = SyncChannel::new(1);
        let sender = chan.sender();
373
        let blockhashes: Vec<Sha256dHash> = new_headers.iter().map(|h| *h.hash()).collect();
374
        let batch_size = self.batch_size;
375

376
        let fetcher = spawn_thread("fetcher", move || {
377
            for chunk in blockhashes.chunks(batch_size) {
378 379 380
                sender
                    .send(daemon.getblocks(&chunk))
                    .expect("failed sending blocks to be indexed");
381
            }
382 383 384
            sender
                .send(Ok(vec![]))
                .expect("failed sending explicit end of stream");
385
        });
386

387
        loop {
388
            waiter.poll()?;
389

Roman Zeyde's avatar
Roman Zeyde committed
390 391
            let batch = chan
                .receiver()
392 393
                .recv()
                .expect("block fetch exited prematurely")?;
394

395 396 397 398
            if batch.is_empty() {
                break;
            }

399
            let rows_iter = batch.iter().flat_map(|block| {
400
                let blockhash = block.bitcoin_hash();
kenshin-samourai's avatar
kenshin-samourai committed
401
                info!("indexing block {}", blockhash);
kenshin-samourai's avatar
kenshin-samourai committed
402
                index_block(block).chain(std::iter::once(last_indexed_block(&blockhash)))
403 404
            });

405
            store.write(rows_iter);
406
        }
407

408
        store.flush(); // make sure no row is left behind
409
        fetcher.join().expect("block fetcher failed");
410 411
        self.headers.write().unwrap().apply(new_headers, tip);
        assert_eq!(tip, self.headers.read().unwrap().tip());
412
        Ok(tip)
413 414
    }
}