Unverified Commit 31e7539e authored by kenshin samourai's avatar kenshin samourai Committed by GitHub
Browse files

Merge pull request #3 from kenshin-samourai/develop_dojo

init master for addrindexrs
parents 4d0f52b8 eba463c9
## Installation
Install [latest Rust](https://rustup.rs/) (1.34+),
[latest Bitcoin Core](https://bitcoincore.org/en/download/) (0.16+)
and [latest Electrum wallet](https://electrum.org/#download) (3.3+).
[latest Bitcoin Core](https://bitcoincore.org/en/download/) (0.16+).
Also, install the following packages (on Debian):
```bash
......@@ -14,17 +13,18 @@ $ sudo apt install clang cmake # for building 'rust-rocksdb'
First build should take ~20 minutes:
```bash
$ git clone https://github.com/romanz/electrs
$ cd electrs
$ git clone https://github.com/Samourai-Wallet/addrindexrs
$ cd addrindexrs
$ cargo build --release
```
## Bitcoind configuration
Allow Bitcoin daemon to sync before starting Electrum server:
Allow Bitcoin daemon to sync before starting the indexer. The indexer requires that bitcoin daemon isn't pruned and maintains a txindex.
```bash
$ bitcoind -server=1 -txindex=0 -prune=0
$ bitcoind -server=1 -txindex=1 -prune=0
```
If you are using `-rpcuser=USER` and `-rpcpassword=PASSWORD` for authentication, please use `cookie="USER:PASSWORD"` option in one of the config files.
......@@ -34,7 +34,7 @@ Otherwise, [`~/.bitcoin/.cookie`](https://github.com/bitcoin/bitcoin/blob/021218
First index sync should take ~1.5 hours (on a dual core Intel CPU @ 3.3 GHz, 8 GB RAM, 1TB WD Blue HDD):
```bash
$ cargo run --release -- -vvv --timestamp --db-dir ./db --electrum-rpc-addr="127.0.0.1:50001"
$ cargo run --release -- -vvv --timestamp --db-dir ./db --indexer-rpc-addr="127.0.0.1:50001"
2018-08-17T18:27:42 - INFO - NetworkInfo { version: 179900, subversion: "/Satoshi:0.17.99/" }
2018-08-17T18:27:42 - INFO - BlockchainInfo { chain: "main", blocks: 537204, headers: 537204, bestblockhash: "0000000000000000002956768ca9421a8ddf4e53b1d81e429bd0125a383e3636", pruned: false, initialblockdownload: false }
2018-08-17T18:27:42 - DEBUG - opening DB at "./db/mainnet"
......@@ -61,10 +61,10 @@ You can specify options via command-line parameters, environment variables or us
Note that the final DB size should be ~20% of the `blk*.dat` files, but it may increase to ~35% at the end of the inital sync (just before the [full compaction is invoked](https://github.com/facebook/rocksdb/wiki/Manual-Compaction)).
If initial sync fails due to `memory allocation of xxxxxxxx bytes failedAborted` errors, as may happen on devices with limited RAM, try the following arguments when starting `electrs`. It should take roughly 18 hours to sync and compact the index on an ODROID-HC1 with 8 CPU cores @ 2GHz, 2GB RAM, and an SSD using the following command:
If initial sync fails due to `memory allocation of xxxxxxxx bytes failedAborted` errors, as may happen on devices with limited RAM, try the following arguments when starting `addrindexrs`. It should take roughly 18 hours to sync and compact the index on an ODROID-HC1 with 8 CPU cores @ 2GHz, 2GB RAM, and an SSD using the following command:
```bash
$ cargo run --release -- -vvvv --index-batch-size=10 --jsonrpc-import --db-dir ./db --electrum-rpc-addr="127.0.0.1:50001"
$ cargo run --release -- -vvvv --index-batch-size=10 --jsonrpc-import --db-dir ./db --indexer-rpc-addr="127.0.0.1:50001"
```
The index database is stored here:
......@@ -75,47 +75,28 @@ $ du db/
## Configuration files and environment variables
The config files must be in the Toml format. These config files are (from lowest priority to highest): `/etc/electrs/config.toml`, `~/.electrs/config.toml`, `./electrs.toml`.
The config files must be in the Toml format. These config files are (from lowest priority to highest): `/etc/addrindexrs/config.toml`, `~/.addrindexrs/config.toml`, `./addrindexrs.toml`.
The options in highest-priority config files override options set in lowest-priority config files. Environment variables override options in config files and finally arguments override everythig else.
For each argument an environment variable of the same name with `ELECTRS_` prefix, upper case letters and underscores instead of hypens exists (e.g. you can use `ELECTRS_ELECTRUM_RPC_ADDR` instead of `--electrum-rpc-addr`). Similarly, for each argument an option in config file exists with underscores instead o hypens (e.g. `electrum_rpc_addr`). In addition, config files support `cookie` option to specify cookie - this is not available using command line or environment variables for security reasonns (other applications could read it otherwise).
For each argument an environment variable of the same name with `ADDRINDEXRS_` prefix, upper case letters and underscores instead of hypens exists (e.g. you can use `ADDRINDEXRS_INDEXER_RPC_ADDR` instead of `--indexer-rpc-addr`). Similarly, for each argument an option in config file exists with underscores instead o hypens (e.g. `indexer_rpc_addr`).
Finally, you need to use a number in config file if you want to increase verbosity (e.g. `verbose = 3` is equivalent to `-vvv`) and `true` value in case of flags (e.g. `timestamp = true`)
## Electrum client
```bash
# Connect only to the local server, for better privacy
$ ./scripts/local-electrum.bash
+ ADDR=127.0.0.1
+ PORT=50001
+ PROTOCOL=t
+ electrum --oneserver --server=127.0.0.1:50001:t
<snip>
```
You can persist Electrum configuration (see `~/.electrum/config`) using:
```bash
$ electrum setconfig oneserver true
$ electrum setconfig server 127.0.0.1:50001:t
$ electrum # will connect only to the local server
```
### SSL connection
In order to use a secure connection, you can also use [NGINX as an SSL endpoint](https://docs.nginx.com/nginx/admin-guide/security-controls/terminating-ssl-tcp/#) by placing the following block in `nginx.conf`.
```nginx
stream {
upstream electrs {
upstream addrindexrs {
server 127.0.0.1:50001;
}
server {
listen 50002 ssl;
proxy_pass electrs;
proxy_pass addrindexrs;
ssl_certificate /path/to/example.crt;
ssl_certificate_key /path/to/example.key;
......@@ -129,10 +110,9 @@ stream {
```bash
$ sudo systemctl restart nginx
$ electrum --oneserver --server=example:50002:s
```
Note: If you are connecting to electrs from Eclair Mobile or another similar client which does not allow self-signed SSL certificates, you can obtain a free SSL certificate as follows:
Note: You can obtain a free SSL certificate as follows:
1. Follow the instructions at https://certbot.eff.org/ to install the certbot on your system.
2. When certbot obtains the SSL certificates for you, change the SSL paths in the nginx template above as follows:
......@@ -167,25 +147,20 @@ $ sudo cat /var/lib/tor/hidden_service/hostname
<your-onion-address>.onion
```
On your client machine, run the following command (assuming Tor proxy service runs on port 9050):
```
$ electrum --oneserver --server <your-onion-address>.onion:50001:t --proxy socks5:127.0.0.1:9050
```
For more details, see http://docs.electrum.org/en/latest/tor.html.
### Sample Systemd Unit File
You may wish to have systemd manage electrs so that it's "always on." Here is a sample unit file (which assumes that the bitcoind unit file is `bitcoind.service`):
You may wish to have systemd manage addrindexrs so that it's "always on." Here is a sample unit file (which assumes that the bitcoind unit file is `bitcoind.service`):
```
[Unit]
Description=Electrs
Description=addrindexrs
After=bitcoind.service
[Service]
WorkingDirectory=/home/bitcoin/electrs
ExecStart=/home/bitcoin/electrs/target/release/electrs --db-dir ./db --electrum-rpc-addr="127.0.0.1:50001"
WorkingDirectory=/home/bitcoin/addrindexrs
ExecStart=/home/bitcoin/addrindexrs/target/release/addrindexrs --db-dir ./db --indexer-rpc-addr="127.0.0.1:50001"
User=bitcoin
Group=bitcoin
Type=simple
......@@ -197,28 +172,3 @@ RestartSec=60
[Install]
WantedBy=multi-user.target
```
## Docker
```bash
$ docker build -t electrs-app .
$ docker run --network host \
--volume $HOME/.bitcoin:/home/user/.bitcoin:ro \
--volume $PWD:/home/user \
--rm -i -t electrs-app \
electrs -vvvv --timestamp --db-dir /home/user/db
```
## Monitoring
Indexing and serving metrics are exported via [Prometheus](https://github.com/pingcap/rust-prometheus):
```bash
$ sudo apt install prometheus
$ echo "
scrape_configs:
- job_name: electrs
static_configs:
- targets: ['localhost:4224']
" | sudo tee -a /etc/prometheus/prometheus.yml
$ sudo systemctl restart prometheus
$ firefox 'http://localhost:9090/graph?g0.range_input=1h&g0.expr=index_height&g0.tab=0'
```
/// Benchmark full compaction.
extern crate electrs;
#[macro_use]
extern crate log;
extern crate error_chain;
use electrs::{config::Config, errors::*, store::DBStore};
use error_chain::ChainedError;
fn run(config: Config) -> Result<()> {
if !config.db_path.exists() {
panic!(
"DB {:?} must exist when running this benchmark!",
config.db_path
);
}
let store = DBStore::open(&config.db_path, /*low_memory=*/ true);
store.compact();
Ok(())
}
fn main() {
if let Err(e) = run(Config::from_args()) {
error!("{}", e.display_chain());
}
}
/// Benchmark regular indexing flow (using JSONRPC), don't persist the resulting index.
extern crate electrs;
extern crate error_chain;
#[macro_use]
extern crate log;
use electrs::{
cache::BlockTxIDsCache, config::Config, daemon::Daemon, errors::*, fake::FakeStore,
index::Index, metrics::Metrics, signal::Waiter,
};
use error_chain::ChainedError;
use std::sync::Arc;
fn run() -> Result<()> {
let signal = Waiter::start();
let config = Config::from_args();
let metrics = Metrics::new(config.monitoring_addr);
metrics.start();
let cache = Arc::new(BlockTxIDsCache::new(0, &metrics));
let daemon = Daemon::new(
&config.daemon_dir,
config.daemon_rpc_addr,
config.cookie_getter(),
config.network_type,
signal.clone(),
cache,
&metrics,
)?;
let fake_store = FakeStore {};
let index = Index::load(&fake_store, &daemon, &metrics, config.index_batch_size)?;
index.update(&fake_store, &signal)?;
Ok(())
}
fn main() {
if let Err(e) = run() {
error!("{}", e.display_chain());
}
}
extern crate electrs;
extern crate hex;
extern crate log;
use electrs::{config::Config, store::DBStore};
fn max_collision(store: DBStore, prefix: &[u8]) {
let prefix_len = prefix.len();
let mut prev: Option<Vec<u8>> = None;
let mut collision_max = 0;
for row in store.iter_scan(prefix) {
assert!(row.key.starts_with(prefix));
if let Some(prev) = prev {
let collision_len = prev
.iter()
.zip(row.key.iter())
.take_while(|(a, b)| a == b)
.count();
if collision_len > collision_max {
eprintln!(
"{} bytes collision found:\n{:?}\n{:?}\n",
collision_len - prefix_len,
revhex(&prev[prefix_len..]),
revhex(&row.key[prefix_len..]),
);
collision_max = collision_len;
}
}
prev = Some(row.key.to_vec());
}
}
fn revhex(value: &[u8]) -> String {
hex::encode(&value.iter().cloned().rev().collect::<Vec<u8>>())
}
fn run(config: Config) {
if !config.db_path.exists() {
panic!("DB {:?} must exist when running this tool!", config.db_path);
}
let store = DBStore::open(&config.db_path, /*low_memory=*/ false);
max_collision(store, b"T");
}
fn main() {
run(Config::from_args());
}
#!/bin/bash
set -eux
ADDR=127.0.0.1 # localhost
PORT=50001 # default mainnet Electrum RPC port
PROTOCOL=t # TCP (no SSL)
# Use only local Electrum server:
electrum --oneserver --server="$ADDR:$PORT:$PROTOCOL" $*
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use std::sync::{Arc, Mutex};
use crate::{config::Config, daemon, errors::*, index, signal::Waiter, store};
use crate::{daemon, errors::*, index, signal::Waiter, store};
//
// Application
//
pub struct App {
store: store::DBStore,
index: index::Index,
daemon: daemon::Daemon,
banner: String,
tip: Mutex<Sha256dHash>,
}
......@@ -15,14 +17,12 @@ impl App {
pub fn new(
store: store::DBStore,
index: index::Index,
daemon: daemon::Daemon,
config: &Config,
daemon: daemon::Daemon
) -> Result<Arc<App>> {
Ok(Arc::new(App {
store,
index,
daemon: daemon.reconnect()?,
banner: config.server_banner.clone(),
tip: Mutex::new(Sha256dHash::default()),
}))
}
......@@ -30,13 +30,16 @@ impl App {
fn write_store(&self) -> &impl store::WriteStore {
&self.store
}
// TODO: use index for queries.
pub fn read_store(&self) -> &dyn store::ReadStore {
&self.store
}
pub fn index(&self) -> &index::Index {
&self.index
}
pub fn daemon(&self) -> &daemon::Daemon {
&self.daemon
}
......@@ -49,12 +52,4 @@ impl App {
}
Ok(new_block)
}
pub fn get_banner(&self) -> Result<String> {
Ok(format!(
"{}\n{}",
self.banner,
self.daemon.get_subversion()?
))
}
}
extern crate electrs;
extern crate addrindexrs;
extern crate error_chain;
#[macro_use]
......@@ -9,15 +9,15 @@ use std::process;
use std::sync::Arc;
use std::time::Duration;
use electrs::{
use addrindexrs::{
app::App,
bulk,
cache::{BlockTxIDsCache, TransactionCache},
cache::BlockTxIDsCache,
config::Config,
daemon::Daemon,
errors::*,
index::Index,
metrics::Metrics,
query::Query,
rpc::RPC,
signal::Waiter,
......@@ -26,9 +26,7 @@ use electrs::{
fn run_server(config: &Config) -> Result<()> {
let signal = Waiter::start();
let metrics = Metrics::new(config.monitoring_addr);
metrics.start();
let blocktxids_cache = Arc::new(BlockTxIDsCache::new(config.blocktxids_cache_size, &metrics));
let blocktxids_cache = Arc::new(BlockTxIDsCache::new(config.blocktxids_cache_size));
let daemon = Daemon::new(
&config.daemon_dir,
......@@ -37,37 +35,38 @@ fn run_server(config: &Config) -> Result<()> {
config.network_type,
signal.clone(),
blocktxids_cache,
&metrics,
)?;
// Perform initial indexing from local blk*.dat block files.
let store = DBStore::open(&config.db_path, /*low_memory=*/ config.jsonrpc_import);
let index = Index::load(&store, &daemon, &metrics, config.index_batch_size)?;
let index = Index::load(&store, &daemon, config.index_batch_size)?;
let store = if is_fully_compacted(&store) {
store // initial import and full compaction are over
// initial import and full compaction are over
store
} else if config.jsonrpc_import {
index.update(&store, &signal)?; // slower: uses JSONRPC for fetching blocks
// slower: uses JSONRPC for fetching blocks
index.update(&store, &signal)?;
full_compaction(store)
} else {
// faster, but uses more memory
let store =
bulk::index_blk_files(&daemon, config.bulk_index_threads, &metrics, &signal, store)?;
bulk::index_blk_files(&daemon, config.bulk_index_threads, &signal, store)?;
let store = full_compaction(store);
index.reload(&store); // make sure the block header index is up-to-date
// make sure the block header index is up-to-date
index.reload(&store);
store
}
.enable_compaction(); // enable auto compactions before starting incremental index updates.
let app = App::new(store, index, daemon, &config)?;
let tx_cache = TransactionCache::new(config.tx_cache_size, &metrics);
let query = Query::new(app.clone(), &metrics, tx_cache, config.txid_limit);
let app = App::new(store, index, daemon)?;
let query = Query::new(app.clone(), config.txid_limit);
let mut server = None; // Electrum RPC server
let mut server = None; // Indexer RPC server
loop {
app.update(&signal)?;
query.update_mempool()?;
server
.get_or_insert_with(|| RPC::start(config.electrum_rpc_addr, query.clone(), &metrics))
.notify(); // update subscribed clients
server.get_or_insert_with(|| RPC::start(config.indexer_rpc_addr, query.clone()));
if let Err(err) = signal.wait(Duration::from_secs(5)) {
info!("stopping server: {}", err);
break;
......
......@@ -16,47 +16,28 @@ use std::thread;
use crate::daemon::Daemon;
use crate::errors::*;
use crate::index::{index_block, last_indexed_block, read_indexed_blockhashes};
use crate::metrics::{CounterVec, Histogram, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use crate::signal::Waiter;
use crate::store::{DBStore, Row, WriteStore};
use crate::util::{spawn_thread, HeaderList, SyncChannel};
//
// Blockchain parser (bulk mode)
//
struct Parser {
magic: u32,
current_headers: HeaderList,
indexed_blockhashes: Mutex<HashSet<Sha256dHash>>,
// metrics
duration: HistogramVec,
block_count: CounterVec,
bytes_read: Histogram,
}
impl Parser {
fn new(
daemon: &Daemon,
metrics: &Metrics,
indexed_blockhashes: HashSet<Sha256dHash>,
) -> Result<Arc<Parser>> {
Ok(Arc::new(Parser {
magic: daemon.magic(),
current_headers: load_headers(daemon)?,
indexed_blockhashes: Mutex::new(indexed_blockhashes),
duration: metrics.histogram_vec(
HistogramOpts::new(
"electrs_parse_duration",
"blk*.dat parsing duration (in seconds)",
),
&["step"],
),
block_count: metrics.counter_vec(
MetricOpts::new("electrs_parse_blocks", "# of block parsed (from blk*.dat)"),
&["type"],
),
bytes_read: metrics.histogram(HistogramOpts::new(
"electrs_parse_bytes_read",
"# of bytes read (from blk*.dat)",
)),
}))
}
......@@ -74,54 +55,43 @@ impl Parser {
}
fn read_blkfile(&self, path: &Path) -> Result<Vec<u8>> {
let timer = self.duration.with_label_values(&["read"]).start_timer();
let blob = fs::read(&path).chain_err(|| format!("failed to read {:?}", path))?;
timer.observe_duration();
self.bytes_read.observe(blob.len() as f64);
Ok(blob)
}
fn index_blkfile(&self, blob: Vec<u8>) -> Result<Vec<Row>> {
let timer = self.duration.with_label_values(&["parse"]).start_timer();
let blocks = parse_blocks(blob, self.magic)?;
timer.observe_duration();
let mut rows = Vec::<Row>::new();
let timer = self.duration.with_label_values(&["index"]).start_timer();
for block in blocks {
let blockhash = block.bitcoin_hash();
if let Some(header) = self.current_headers.header_by_blockhash(&blockhash) {
if self
.indexed_blockhashes
if let Some(_header) = self.current_headers.header_by_blockhash(&blockhash) {
if self.indexed_blockhashes
.lock()
.expect("indexed_blockhashes")
.insert(blockhash)
{
rows.extend(index_block(&block, header.height()));
self.block_count.with_label_values(&["indexed"]).inc();
} else {
self.block_count.with_label_values(&["duplicate"]).inc();
rows.extend(index_block(&block));
}
} else {
// will be indexed later (after bulk load is over) if not an orphan block
self.block_count.with_label_values(&["skipped"]).inc();
}
}
timer.observe_duration();
let timer = self.duration.with_label_values(&["sort"]).start_timer();
rows.sort_unstable_by(|a, b| a.key.cmp(&b.key));
timer.observe_duration();
Ok(rows)
}
}
//
// Parse the bitcoin blocks
//
fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<Block>> {
let mut cursor = Cursor::new(&blob);
let mut blocks = vec![];
let max_pos = blob.len() as u64;
while cursor.position() < max_pos {
let offset = cursor.position();
match u32::consensus_decode(&mut cursor) {
Ok(value) => {
if magic != value {
......@@ -131,6 +101,7 @@ fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<Block>> {
}
Err(_) => break, // EOF
};
let block_size = u32::consensus_decode(&mut cursor).chain_err(|| "no block size")?;
let start = cursor.position();
let end = start + block_size as u64;
......@@ -148,14 +119,20 @@ fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<Block>> {
}
Err(_) => break, // EOF
}
let block: Block = deserialize(&blob[start as usize..end as usize])
.chain_err(|| format!("failed to parse block at {}..{}", start, end))?;
blocks.push(block);
cursor.set_position(end as u64);
}
Ok(blocks)
}
//
// Retrieve the block headers
//
fn load_headers(daemon: &Daemon) -> Result<HeaderList> {
let tip = daemon.getbestblockhash()?;
let mut headers = HeaderList::empty();
......@@ -164,6 +141,9 @@ fn load_headers(daemon: &Daemon) -> Result<HeaderList> {
Ok(headers)
}
//
// Manage open file limits
//
fn set_open_files_limit(limit: libc::rlim_t) {
let resource = libc::RLIMIT_NOFILE;
let mut rlim = libc::rlimit {
......@@ -181,9 +161,13 @@ fn set_open_files_limit(limit: libc::rlim_t) {
}
}
type JoinHandle = thread::JoinHandle<Result<()>>;
type BlobReceiver = Arc<Mutex<Receiver<(Vec<u8>, PathBuf)>>>;
//
//
//
fn start_reader(blk_files: Vec<PathBuf>, parser: Arc<Parser>) -> (BlobReceiver, JoinHandle) {
let chan = SyncChannel::new(0);
let blobs = chan.sender();
......@@ -198,6 +182,9 @@ fn start_reader(blk_files: Vec<PathBuf>, parser: Arc<Parser>) -> (BlobReceiver,
(Arc::new(Mutex::new(chan.into_receiver())), handle)
}