Commit 55177549 authored by kenshin-samourai's avatar kenshin-samourai
Browse files

Merge branch 'feat_rest_api' into 'develop_dojo'

implement a http rest api

See merge request !9
parents c20d8c5c 7b745e45
This diff is collapsed.
...@@ -22,31 +22,37 @@ lto = true ...@@ -22,31 +22,37 @@ lto = true
latest_rust = [] # use latest Rust features (otherwise, support Rust 1.34) latest_rust = [] # use latest Rust features (otherwise, support Rust 1.34)
[dependencies] [dependencies]
base64 = "0.10" base64 = "=0.10"
bincode = "1.0" bincode = "=1.0"
bitcoin = { version = "0.21", features = ["use-serde"] } bitcoin = { version = "=0.21", features = ["use-serde"] }
bitcoin_hashes = "0.7.1" bitcoin_hashes = "=0.7.1"
configure_me = "0.3.3" configure_me = "=0.3.3"
crossbeam-channel = "0.3" crossbeam-channel = "=0.3"
dirs = "1.0" dirs = "=1.0"
error-chain = "0.12" error-chain = "=0.12"
glob = "0.3" glob = "=0.3"
hex = "0.3" hex = "=0.3"
libc = "0.2" hyper = "=0.13.6"
log = "0.4" hyperlocal = "=0.7"
lru = "0.1" libc = "=0.2"
num_cpus = "1.0" log = "=0.4"
page_size = "0.4" lru = "=0.1"
rocksdb = "0.12" num_cpus = "=1.0"
rust-crypto = "0.2" page_size = "=0.4"
serde = "1.0" rocksdb = "=0.12"
serde_derive = "1.0" rust-crypto = "=0.2"
serde_json = "1.0" serde = "=1.0"
signal-hook = "0.1" serde_derive = "=1.0"
stderrlog = "0.4.1" serde_json = "=1.0"
sysconf = ">=0.3.4" signal-hook = "=0.1"
time = "0.1" socket2 = { version = "=0.3.18", features = ["reuseport"] }
tiny_http = "0.6" stderrlog = "=0.4.1"
sysconf = "=0.3.4"
time = "=0.1"
tiny_http = "=0.6"
# close to same tokio version as dependent by hyper v0.13.6 and hyperlocal 0.7 -- things can go awry if they mismatch
tokio = { version = "=0.2.6", features = ["sync", "macros"] }
url = "=2.2.0"
[build-dependencies] [build-dependencies]
configure_me_codegen = "0.3.12" configure_me_codegen = "=0.3.12"
...@@ -45,6 +45,11 @@ name = "indexer_rpc_addr" ...@@ -45,6 +45,11 @@ name = "indexer_rpc_addr"
type = "crate::config::ResolvAddr" type = "crate::config::ResolvAddr"
doc = "Indexer JSONRPC 'addr:port' to listen on (default: '127.0.0.1:50001' for mainnet, '127.0.0.1:60001' for testnet and '127.0.0.1:60401' for regtest)" doc = "Indexer JSONRPC 'addr:port' to listen on (default: '127.0.0.1:50001' for mainnet, '127.0.0.1:60001' for testnet and '127.0.0.1:60401' for regtest)"
[[param]]
name = "indexer_http_addr"
type = "crate::config::ResolvAddr"
doc = "Indexer REST 'addr:port' to listen on (default: 127.0.0.1:80)"
[[param]] [[param]]
name = "daemon_rpc_addr" name = "daemon_rpc_addr"
type = "crate::config::ResolvAddr" type = "crate::config::ResolvAddr"
......
...@@ -20,11 +20,12 @@ use addrindexrs::{ ...@@ -20,11 +20,12 @@ use addrindexrs::{
index::Index, index::Index,
query::Query, query::Query,
rpc::RPC, rpc::RPC,
rest,
signal::Waiter, signal::Waiter,
store::{full_compaction, is_fully_compacted, DBStore}, store::{full_compaction, is_fully_compacted, DBStore},
}; };
fn run_server(config: &Config) -> Result<()> { fn run_server(config: Arc<Config>) -> Result<()> {
let signal = Waiter::start(); let signal = Waiter::start();
let blocktxids_cache = Arc::new(BlockTxIDsCache::new(config.blocktxids_cache_size)); let blocktxids_cache = Arc::new(BlockTxIDsCache::new(config.blocktxids_cache_size));
...@@ -62,6 +63,7 @@ fn run_server(config: &Config) -> Result<()> { ...@@ -62,6 +63,7 @@ fn run_server(config: &Config) -> Result<()> {
let app = App::new(store, index, daemon)?; let app = App::new(store, index, daemon)?;
let query = Query::new(app.clone(), config.txid_limit); let query = Query::new(app.clone(), config.txid_limit);
let rest_server = rest::start(Arc::clone(&config), Arc::clone(&query));
let mut server = None; // Indexer RPC server let mut server = None; // Indexer RPC server
loop { loop {
app.update(&signal)?; app.update(&signal)?;
...@@ -69,6 +71,7 @@ fn run_server(config: &Config) -> Result<()> { ...@@ -69,6 +71,7 @@ fn run_server(config: &Config) -> Result<()> {
server.get_or_insert_with(|| RPC::start(config.indexer_rpc_addr, query.clone())); server.get_or_insert_with(|| RPC::start(config.indexer_rpc_addr, query.clone()));
if let Err(err) = signal.wait(Duration::from_secs(5)) { if let Err(err) = signal.wait(Duration::from_secs(5)) {
info!("stopping server: {}", err); info!("stopping server: {}", err);
rest_server.stop();
break; break;
} }
} }
...@@ -77,7 +80,7 @@ fn run_server(config: &Config) -> Result<()> { ...@@ -77,7 +80,7 @@ fn run_server(config: &Config) -> Result<()> {
fn main() { fn main() {
let config = Config::from_args(); let config = Config::from_args();
if let Err(e) = run_server(&config) { if let Err(e) = run_server(Arc::new(config)) {
error!("server failed: {}", e.display_chain()); error!("server failed: {}", e.display_chain());
process::exit(1); process::exit(1);
} }
......
...@@ -147,6 +147,7 @@ pub struct Config { ...@@ -147,6 +147,7 @@ pub struct Config {
pub bulk_index_threads: usize, pub bulk_index_threads: usize,
pub txid_limit: usize, pub txid_limit: usize,
pub blocktxids_cache_size: usize, pub blocktxids_cache_size: usize,
pub indexer_http_addr: SocketAddr,
} }
/// Returns default daemon directory /// Returns default daemon directory
...@@ -200,6 +201,12 @@ impl Config { ...@@ -200,6 +201,12 @@ impl Config {
Network::Regtest => 60401, Network::Regtest => 60401,
}; };
let default_http_port = match config.network {
Network::Bitcoin => 8080,
Network::Testnet => 8080,
Network::Regtest => 8080,
};
let daemon_rpc_addr: SocketAddr = config.daemon_rpc_addr.map_or( let daemon_rpc_addr: SocketAddr = config.daemon_rpc_addr.map_or(
(DEFAULT_SERVER_ADDRESS, default_daemon_port).into(), (DEFAULT_SERVER_ADDRESS, default_daemon_port).into(),
ResolvAddr::resolve_or_exit, ResolvAddr::resolve_or_exit,
...@@ -210,6 +217,11 @@ impl Config { ...@@ -210,6 +217,11 @@ impl Config {
ResolvAddr::resolve_or_exit, ResolvAddr::resolve_or_exit,
); );
let indexer_http_addr: SocketAddr = config.indexer_http_addr.map_or(
(DEFAULT_SERVER_ADDRESS, default_http_port).into(),
ResolvAddr::resolve_or_exit,
);
match config.network { match config.network {
Network::Bitcoin => (), Network::Bitcoin => (),
Network::Testnet => config.daemon_dir.push("testnet3"), Network::Testnet => config.daemon_dir.push("testnet3"),
...@@ -251,6 +263,7 @@ impl Config { ...@@ -251,6 +263,7 @@ impl Config {
daemon_rpc_addr, daemon_rpc_addr,
cookie: config.cookie, cookie: config.cookie,
indexer_rpc_addr, indexer_rpc_addr,
indexer_http_addr,
jsonrpc_import: config.jsonrpc_import, jsonrpc_import: config.jsonrpc_import,
index_batch_size: config.index_batch_size, index_batch_size: config.index_batch_size,
bulk_index_threads: config.bulk_index_threads, bulk_index_threads: config.bulk_index_threads,
......
...@@ -20,6 +20,7 @@ pub mod errors; ...@@ -20,6 +20,7 @@ pub mod errors;
pub mod index; pub mod index;
pub mod mempool; pub mod mempool;
pub mod query; pub mod query;
pub mod rest;
pub mod rpc; pub mod rpc;
pub mod signal; pub mod signal;
pub mod store; pub mod store;
......
#![allow(dead_code)]
use bitcoin::consensus::encode::deserialize; use bitcoin::consensus::encode::deserialize;
use bitcoin_hashes::sha256d::Hash as Sha256dHash; use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
......
#![allow(dead_code)]
#![allow(unused_variables)]
#![allow(unused_mut)]
use crate::config::Config;
use crate::query::Query;
use crate::util::{create_socket};
use crate::errors;
use crate::errors::ResultExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Response, Server, StatusCode};
use tokio::sync::oneshot;
use serde::Serialize;
use serde_json;
use std::collections::HashMap;
use std::num::ParseIntError;
use std::sync::Arc;
use std::thread;
use url::form_urlencoded;
use bitcoin::consensus::encode;
use bitcoin::consensus::encode::serialize;
use bitcoin_hashes::hex::{FromHex, ToHex};
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use bitcoin::hashes::Error as HashError;
use hex::{self, FromHexError};
const TTL_LONG: u32 = 157_784_630; // ttl for static resources (5 years)
const TTL_SHORT: u32 = 10; // ttl for volatile resources
const TTL_MEMPOOL_RECENT: u32 = 5; // ttl for GET /mempool/recent
#[tokio::main]
async fn run_server(config: Arc<Config>, query: Arc<Query>, rx: oneshot::Receiver<()>) {
let addr = &config.indexer_http_addr;
let config = Arc::clone(&config);
let query = Arc::clone(&query);
let make_service_fn_inn = || {
let query = Arc::clone(&query);
let config = Arc::clone(&config);
async move {
Ok::<_, hyper::Error>(service_fn(move |req| {
let query = Arc::clone(&query);
let config = Arc::clone(&config);
async move {
let method = req.method().clone();
let uri = req.uri().clone();
let body = hyper::body::to_bytes(req.into_body()).await?;
let mut resp = handle_request(method, uri, body, &query, &config)
.unwrap_or_else(|err| {
warn!("{:?}", err);
Response::builder()
.status(err.0)
.header("Content-Type", "text/plain")
.body(Body::from(err.1))
.unwrap()
});
// if let Some(ref origins) = config.cors {
// resp.headers_mut()
// .insert("Access-Control-Allow-Origin", origins.parse().unwrap());
// }
Ok::<_, hyper::Error>(resp)
}
}))
}
};
info!("REST server running on {}", addr);
let socket = create_socket(&addr);
socket.listen(511).expect("setting backlog failed");
let server = Server::from_tcp(socket.into_tcp_listener())
.expect("Server::from_tcp failed")
.serve(make_service_fn(move |_| make_service_fn_inn()))
.with_graceful_shutdown(async {
rx.await.ok();
})
.await;
if let Err(e) = server {
eprintln!("server error: {}", e);
}
}
pub fn start(config: Arc<Config>, query: Arc<Query>) -> Handle {
let (tx, rx) = oneshot::channel::<()>();
Handle {
tx,
thread: thread::spawn(move || {
run_server(config, query, rx);
}),
}
}
pub struct Handle {
tx: oneshot::Sender<()>,
thread: thread::JoinHandle<()>,
}
impl Handle {
pub fn stop(self) {
self.tx.send(()).expect("failed to send shutdown signal");
self.thread.join().expect("REST server failed");
}
}
fn json_response<T: Serialize>(value: T, ttl: u32) -> Result<Response<Body>, HttpError> {
let value = serde_json::to_string(&value)?;
Ok(Response::builder()
.header("Content-Type", "application/json")
.header("Cache-Control", format!("public, max-age={:}", ttl))
.body(Body::from(value))
.unwrap())
}
fn handle_request(
method: Method,
uri: hyper::Uri,
body: hyper::body::Bytes,
query: &Query,
config: &Config,
) -> Result<Response<Body>, HttpError> {
// TODO it looks hyper does not have routing and query parsing :(
let path: Vec<&str> = uri.path().split('/').skip(1).collect();
let query_params = match uri.query() {
Some(value) => form_urlencoded::parse(&value.as_bytes())
.into_owned()
.collect::<HashMap<String, String>>(),
None => HashMap::new(),
};
info!("handle {:?} {:?}", method, uri);
match (
&method,
path.get(0),
path.get(1),
path.get(2),
path.get(3),
path.get(4),
) {
(&Method::GET, Some(&"blockchain"), Some(&"scripthash"), Some(script_str), Some(&"balance"), None) => {
json_response(
json!({"confirmed": null, "unconfirmed": null}),
TTL_MEMPOOL_RECENT
)
}
(&Method::GET, Some(&"blockchain"), Some(&"scripthash"), Some(script_str), Some(&"history"), None) => {
let script_hash = Sha256dHash::from_hex(script_str).chain_err(|| "bad script_hash")?;
let status = query.status(&script_hash[..])?;
json_response(
json!(serde_json::Value::Array(
status
.history()
.into_iter()
.map(|item| json!({"tx_hash": item.to_hex()}))
.collect()
)),
TTL_MEMPOOL_RECENT
)
}
(&Method::GET, Some(&"blockchain"), Some(&"scripthashes"), Some(&"history"), None, None) => {
let script_hashes = query_params
.get("scripthashes")
.cloned()
.ok_or_else(|| HttpError::from("Missing scripthashes".to_string()))?;
let v_script_hashes: Vec<&str> = script_hashes.split(',').collect();
let mut v_txids = vec![];
for script_str in v_script_hashes.iter() {
let script_hash = Sha256dHash::from_hex(script_str).chain_err(|| "bad script_hash")?;
let status = query.status(&script_hash[..])?;
let txids: Vec<String> = status
.history()
.into_iter()
.map(|item| item.to_hex())
.collect();
v_txids.push(json!({
"script_hash": script_str,
"txids": txids}));
}
json_response(
json!(serde_json::Value::Array(v_txids)),
TTL_MEMPOOL_RECENT
)
}
(&Method::GET, Some(&"blocks"), Some(&"tip"), None, None, None) => {
let entry = query.get_best_header()?;
let hex_header = hex::encode(serialize(entry.header()));
json_response(
json!({"hex": hex_header, "height": entry.height()}),
TTL_SHORT
)
}
_ => Err(HttpError::not_found(format!(
"endpoint does not exist {:?}",
uri.path()
))),
}
}
#[derive(Debug)]
struct HttpError(StatusCode, String);
impl HttpError {
fn not_found(msg: String) -> Self {
HttpError(StatusCode::NOT_FOUND, msg)
}
}
impl From<String> for HttpError {
fn from(msg: String) -> Self {
HttpError(StatusCode::BAD_REQUEST, msg)
}
}
impl From<ParseIntError> for HttpError {
fn from(_e: ParseIntError) -> Self {
//HttpError::from(e.description().to_string())
HttpError::from("Invalid number".to_string())
}
}
impl From<HashError> for HttpError {
fn from(_e: HashError) -> Self {
//HttpError::from(e.description().to_string())
HttpError::from("Invalid hash string".to_string())
}
}
impl From<FromHexError> for HttpError {
fn from(_e: FromHexError) -> Self {
//HttpError::from(e.description().to_string())
HttpError::from("Invalid hex string".to_string())
}
}
impl From<bitcoin::hashes::hex::Error> for HttpError {
fn from(_e: bitcoin::hashes::hex::Error) -> Self {
//HttpError::from(e.description().to_string())
HttpError::from("Invalid hex string".to_string())
}
}
impl From<bitcoin::util::address::Error> for HttpError {
fn from(_e: bitcoin::util::address::Error) -> Self {
//HttpError::from(e.description().to_string())
HttpError::from("Invalid Bitcoin address".to_string())
}
}
impl From<errors::Error> for HttpError {
fn from(e: errors::Error) -> Self {
warn!("errors::Error: {:?}", e);
match e.description().to_string().as_ref() {
"getblock RPC error: {\"code\":-5,\"message\":\"Block not found\"}" => {
HttpError::not_found("Block not found".to_string())
}
_ => HttpError::from(e.to_string()),
}
}
}
impl From<serde_json::Error> for HttpError {
fn from(e: serde_json::Error) -> Self {
HttpError::from(e.to_string())
}
}
impl From<encode::Error> for HttpError {
fn from(e: encode::Error) -> Self {
HttpError::from(e.to_string())
}
}
impl From<std::string::FromUtf8Error> for HttpError {
fn from(e: std::string::FromUtf8Error) -> Self {
HttpError::from(e.to_string())
}
}
...@@ -9,6 +9,9 @@ use std::slice; ...@@ -9,6 +9,9 @@ use std::slice;
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}; use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender};
use std::thread; use std::thread;
use time; use time;
use socket2::{Domain, Protocol, Socket, Type};
use std::net::SocketAddr;
pub type Bytes = Vec<u8>; pub type Bytes = Vec<u8>;
pub type HeaderMap = HashMap<Sha256dHash, BlockHeader>; pub type HeaderMap = HashMap<Sha256dHash, BlockHeader>;
...@@ -280,6 +283,24 @@ where ...@@ -280,6 +283,24 @@ where
.unwrap() .unwrap()
} }
pub fn create_socket(addr: &SocketAddr) -> Socket {
let domain = match &addr {
SocketAddr::V4(_) => Domain::ipv4(),
SocketAddr::V6(_) => Domain::ipv6(),
};
let socket =
Socket::new(domain, Type::stream(), Some(Protocol::tcp())).expect("creating socket failed");
#[cfg(unix)]
socket
.set_reuse_port(true)
.expect("cannot enable SO_REUSEPORT");
socket.bind(&addr.clone().into()).expect("cannot bind");
socket
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#[test] #[test]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment