Unverified Commit 48f26a53 authored by Roman Zeyde's avatar Roman Zeyde
Browse files

Use `spawn_thread` in metrics and rpc modules

parent f726853e
use prometheus::{self, Encoder};
use std::io;
use std::net::SocketAddr;
use std::thread;
use tiny_http;
pub use prometheus::{HistogramOpts, HistogramTimer, HistogramVec, IntCounter as Counter,
IntGauge as Gauge, Opts as MetricOpts};
use util::spawn_thread;
pub struct Metrics {
reg: prometheus::Registry,
addr: SocketAddr,
......@@ -41,7 +42,7 @@ impl Metrics {
pub fn start(&self) {
let server = tiny_http::Server::http(self.addr).unwrap();
let reg = self.reg.clone();
thread::spawn(move || loop {
spawn_thread("metrics", move || loop {
if let Err(e) = handle_request(&reg, server.recv()) {
error!("http error: {}", e);
}
......
......@@ -13,7 +13,7 @@ use std::thread;
use metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use query::Query;
use util::{Channel, HeaderEntry, SyncChannel};
use util::{spawn_thread, Channel, HeaderEntry, SyncChannel};
use errors::*;
......@@ -328,7 +328,7 @@ impl Connection {
pub fn run(mut self) {
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let child = thread::spawn(|| Connection::handle_requests(reader, tx));
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies() {
error!(
"[{}] connection handling failed: {}",
......@@ -371,7 +371,7 @@ impl RPC {
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
) {
thread::spawn(move || {
spawn_thread("notification", move || {
for msg in notification.receiver().iter() {
let mut senders = senders.lock().unwrap();
match msg {
......@@ -392,7 +392,7 @@ impl RPC {
fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
let chan = Channel::new();
let acceptor = chan.sender();
thread::spawn(move || {
spawn_thread("acceptor", move || {
let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
info!("RPC server running on {}", addr);
loop {
......@@ -417,7 +417,7 @@ impl RPC {
let notification = Channel::new();
let handle = RPC {
notification: notification.sender(),
server: thread::spawn(move || {
server: spawn_thread("rpc", move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
let acceptor = RPC::start_acceptor(addr);
RPC::start_notifier(notification, senders.clone(), acceptor.sender());
......@@ -426,7 +426,7 @@ impl RPC {
let query = query.clone();
let senders = senders.clone();
let stats = stats.clone();
children.push(thread::spawn(move || {
children.push(spawn_thread("peer", move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr, stats);
senders.lock().unwrap().push(conn.chan.sender());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment