store.rs 5.41 KB
Newer Older
Roman Zeyde's avatar
Roman Zeyde committed
1
use rocksdb;
2
use std::path::{Path, PathBuf};
3

Roman Zeyde's avatar
Roman Zeyde committed
4
use crate::util::Bytes;
Roman Zeyde's avatar
Roman Zeyde committed
5

kenshin-samourai's avatar
kenshin-samourai committed
6 7 8
//
// A row from the Db store
//
9
#[derive(Clone)]
Roman Zeyde's avatar
Roman Zeyde committed
10 11 12 13 14
pub struct Row {
    pub key: Bytes,
    pub value: Bytes,
}

15
impl Row {
16
    pub fn into_pair(self) -> (Bytes, Bytes) {
17 18 19 20
        (self.key, self.value)
    }
}

kenshin-samourai's avatar
kenshin-samourai committed
21 22 23
//
// Traits for the Db store
//
24
pub trait ReadStore: Sync {
25 26
    fn get(&self, key: &[u8]) -> Option<Bytes>;
    fn scan(&self, prefix: &[u8]) -> Vec<Row>;
27 28 29
}

pub trait WriteStore: Sync {
Roman Zeyde's avatar
Roman Zeyde committed
30
    fn write<I: IntoIterator<Item = Row>>(&self, rows: I);
31
    fn flush(&self);
32 33
}

kenshin-samourai's avatar
kenshin-samourai committed
34 35 36
//
// Options
//
37 38 39 40
#[derive(Clone)]
struct Options {
    path: PathBuf,
    bulk_import: bool,
41
    low_memory: bool,
42 43
}

kenshin-samourai's avatar
kenshin-samourai committed
44 45 46
//
// Db store
//
47 48
pub struct DBStore {
    db: rocksdb::DB,
49
    opts: Options,
Roman Zeyde's avatar
Roman Zeyde committed
50 51
}

52
impl DBStore {
53 54 55
    fn open_opts(opts: Options) -> Self {
        debug!("opening DB at {:?}", opts.path);
        let mut db_opts = rocksdb::Options::default();
Roman Zeyde's avatar
Roman Zeyde committed
56
        db_opts.create_if_missing(true);
57
        // db_opts.set_keep_log_file_num(10);
58
        db_opts.set_max_open_files(if opts.bulk_import { 16 } else { 256 });
59 60
        db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
        db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
61
        db_opts.set_target_file_size_base(256 << 20);
62
        db_opts.set_write_buffer_size(256 << 20);
63
        db_opts.set_disable_auto_compactions(opts.bulk_import); // for initial bulk load
64
        db_opts.set_advise_random_on_open(!opts.bulk_import); // bulk load uses sequential I/O
Roman Zeyde's avatar
Roman Zeyde committed
65
        if !opts.low_memory {
66 67
            db_opts.set_compaction_readahead_size(1 << 20);
        }
Roman Zeyde's avatar
Roman Zeyde committed
68 69

        let mut block_opts = rocksdb::BlockBasedOptions::default();
70
        block_opts.set_block_size(if opts.low_memory { 256 << 10 } else { 1 << 20 });
71
        DBStore {
72 73
            db: rocksdb::DB::open(&db_opts, &opts.path).unwrap(),
            opts,
74 75 76
        }
    }

77
    /// Opens a new RocksDB at the specified location.
78
    pub fn open(path: &Path, low_memory: bool) -> Self {
79 80 81
        DBStore::open_opts(Options {
            path: path.to_path_buf(),
            bulk_import: true,
82
            low_memory,
83 84 85 86 87
        })
    }

    pub fn enable_compaction(self) -> Self {
        let mut opts = self.opts.clone();
Roman Zeyde's avatar
Roman Zeyde committed
88
        if opts.bulk_import {
89 90
            opts.bulk_import = false;
            info!("enabling auto-compactions");
Roman Zeyde's avatar
Roman Zeyde committed
91 92
            let opts = [("disable_auto_compactions", "false")];
            self.db.set_options(&opts).unwrap();
93
        }
Roman Zeyde's avatar
Roman Zeyde committed
94
        self
Roman Zeyde's avatar
Roman Zeyde committed
95 96
    }

97
    pub fn compact(self) -> Self {
Roman Zeyde's avatar
Roman Zeyde committed
98
        info!("starting full compaction");
Roman Zeyde's avatar
Roman Zeyde committed
99
        self.db.compact_range(None::<&[u8]>, None::<&[u8]>); // would take a while
Roman Zeyde's avatar
Roman Zeyde committed
100
        info!("finished full compaction");
Roman Zeyde's avatar
Roman Zeyde committed
101
        self
Roman Zeyde's avatar
Roman Zeyde committed
102
    }
103 104 105 106 107 108 109 110 111 112

    pub fn iter_scan(&self, prefix: &[u8]) -> ScanIterator {
        ScanIterator {
            prefix: prefix.to_vec(),
            iter: self.db.prefix_iterator(prefix),
            done: false,
        }
    }
}

kenshin-samourai's avatar
kenshin-samourai committed
113 114 115
//
// Iterator supporting scans of the Db store
//
Roman Zeyde's avatar
Roman Zeyde committed
116
pub struct ScanIterator<'a> {
117
    prefix: Vec<u8>,
Roman Zeyde's avatar
Roman Zeyde committed
118
    iter: rocksdb::DBIterator<'a>,
119 120 121
    done: bool,
}

Roman Zeyde's avatar
Roman Zeyde committed
122
impl<'a> Iterator for ScanIterator<'a> {
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
    type Item = Row;

    fn next(&mut self) -> Option<Row> {
        if self.done {
            return None;
        }
        let (key, value) = self.iter.next()?;
        if !key.starts_with(&self.prefix) {
            self.done = true;
            return None;
        }
        Some(Row {
            key: key.to_vec(),
            value: value.to_vec(),
        })
    }
139
}
Roman Zeyde's avatar
Roman Zeyde committed
140

kenshin-samourai's avatar
kenshin-samourai committed
141 142 143
//
// Read functions for the Db store
//
144
impl ReadStore for DBStore {
145 146
    fn get(&self, key: &[u8]) -> Option<Bytes> {
        self.db.get(key).unwrap().map(|v| v.to_vec())
147 148
    }

149 150
    // TODO: use generators
    fn scan(&self, prefix: &[u8]) -> Vec<Row> {
Roman Zeyde's avatar
Roman Zeyde committed
151
        let mut rows = vec![];
152 153 154 155
        for (key, value) in self.db.iterator(rocksdb::IteratorMode::From(
            prefix,
            rocksdb::Direction::Forward,
        )) {
156
            if !key.starts_with(prefix) {
Roman Zeyde's avatar
Roman Zeyde committed
157 158
                break;
            }
159 160 161 162
            rows.push(Row {
                key: key.to_vec(),
                value: value.to_vec(),
            });
Roman Zeyde's avatar
Roman Zeyde committed
163 164 165
        }
        rows
    }
166
}
167

kenshin-samourai's avatar
kenshin-samourai committed
168 169 170
//
// Write functions for the Db store
//
171
impl WriteStore for DBStore {
Roman Zeyde's avatar
Roman Zeyde committed
172
    fn write<I: IntoIterator<Item = Row>>(&self, rows: I) {
173
        let mut batch = rocksdb::WriteBatch::default();
174 175
        for row in rows {
            batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
176 177
        }
        let mut opts = rocksdb::WriteOptions::new();
178 179
        opts.set_sync(!self.opts.bulk_import);
        opts.disable_wal(self.opts.bulk_import);
180 181
        self.db.write_opt(batch, &opts).unwrap();
    }
182 183 184 185

    fn flush(&self) {
        let mut opts = rocksdb::WriteOptions::new();
        opts.set_sync(true);
Roman Zeyde's avatar
Roman Zeyde committed
186
        opts.disable_wal(false);
187 188
        let empty = rocksdb::WriteBatch::default();
        self.db.write_opt(empty, &opts).unwrap();
189
    }
Roman Zeyde's avatar
Roman Zeyde committed
190
}
191

kenshin-samourai's avatar
kenshin-samourai committed
192 193 194
//
// Close function for the Db store
//
195 196
impl Drop for DBStore {
    fn drop(&mut self) {
197
        trace!("closing DB at {:?}", self.opts.path);
198 199
    }
}
200

kenshin-samourai's avatar
kenshin-samourai committed
201 202 203
//
// Compaction
//
204 205 206 207 208 209 210 211 212
fn full_compaction_marker() -> Row {
    Row {
        key: b"F".to_vec(),
        value: b"".to_vec(),
    }
}

pub fn full_compaction(store: DBStore) -> DBStore {
    store.write(vec![full_compaction_marker()]);
213
    store.flush();
214
    let store = store.compact().enable_compaction();
215 216 217
    store
}

218
pub fn is_fully_compacted(store: &dyn ReadStore) -> bool {
219 220 221
    let marker = store.get(&full_compaction_marker().key);
    marker.is_some()
}