1
use std::{
2
    any::Any,
3
    borrow::{Borrow, Cow},
4
    collections::HashMap,
5
    convert::Infallible,
6
    fmt::{Debug, Display},
7
    fs,
8
    ops::{Deref, DerefMut, RangeBounds},
9
    path::{Path, PathBuf},
10
    sync::{
11
        atomic::{AtomicU16, Ordering},
12
        Arc,
13
    },
14
};
15

            
16
use flume::Sender;
17
use once_cell::sync::Lazy;
18
use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
19

            
20
use crate::{
21
    context::Context,
22
    error::Error,
23
    io::{fs::StdFileManager, FileManager, ManagedFile, PathId},
24
    transaction::{LogEntry, ManagedTransaction, TransactionId, TransactionManager},
25
    tree::{
26
        self,
27
        root::{AnyReducer, AnyTreeRoot},
28
        state::AnyTreeState,
29
        EmbeddedIndex, KeySequence, Modification, ModificationResult, Operation, PersistenceMode,
30
        ScanEvaluation, SequenceEntry, SequenceId, SequenceIndex, State, TransactableCompaction,
31
        TreeEntry, TreeFile, TreeRoot, TreeValueIndex, VersionedTreeRoot,
32
    },
33
    vault::AnyVault,
34
    ArcBytes, ChunkCache, ErrorKind,
35
};
36

            
37
/// A multi-tree transactional B-Tree database.
38
#[derive(Debug)]
39
pub struct Roots<File: ManagedFile> {
40
    data: Arc<Data<File>>,
41
}
42

            
43
#[derive(Debug)]
44
struct Data<File: ManagedFile> {
45
    context: Context<File::Manager>,
46
    transactions: TransactionManager<File::Manager>,
47
    thread_pool: ThreadPool<File>,
48
    path: PathBuf,
49
    tree_states: Mutex<HashMap<String, Box<dyn AnyTreeState>>>,
50
    tree_paths: Mutex<HashMap<String, PathId>>,
51
}
52

            
53
impl<File: ManagedFile> Roots<File> {
54
17
    fn open<P: Into<PathBuf> + Send>(
55
17
        path: P,
56
17
        context: Context<File::Manager>,
57
17
        thread_pool: ThreadPool<File>,
58
17
    ) -> Result<Self, Error> {
59
17
        let path = path.into();
60
17
        if !path.exists() {
61
            fs::create_dir_all(&path)?;
62
17
        } else if !path.is_dir() {
63
            return Err(Error::from(format!(
64
                "'{:?}' already exists, but is not a directory.",
65
                path
66
            )));
67
17
        }
68

            
69
17
        let transactions = TransactionManager::spawn(&path, context.clone())?;
70
17
        Ok(Self {
71
17
            data: Arc::new(Data {
72
17
                context,
73
17
                path,
74
17
                transactions,
75
17
                thread_pool,
76
17
                tree_states: Mutex::default(),
77
17
                tree_paths: Mutex::default(),
78
17
            }),
79
17
        })
80
17
    }
81

            
82
    /// Returns the path to the database directory.
83
    #[must_use]
84
19
    pub fn path(&self) -> &Path {
85
19
        &self.data.path
86
19
    }
87

            
88
    /// Returns the vault used to encrypt this database.
89
39845
    pub fn context(&self) -> &Context<File::Manager> {
90
39845
        &self.data.context
91
39845
    }
92

            
93
    /// Returns the transaction manager for this database.
94
    #[must_use]
95
15198
    pub fn transactions(&self) -> &TransactionManager<File::Manager> {
96
15198
        &self.data.transactions
97
15198
    }
98

            
99
    /// Opens a tree named `name`.
100
    ///
101
    /// ## Errors
102
    ///
103
    /// - [`InvalidTreeName`](ErrorKind::InvalidTreeName): The name contained an
104
    ///   invalid character. For a full list of valid characters, see the
105
    ///   documentation on [`InvalidTreeName`](ErrorKind::InvalidTreeName).
106
    pub fn tree<Root: tree::Root>(
107
        &self,
108
        root: TreeRoot<Root, File>,
109
    ) -> Result<Tree<Root, File>, Error> {
110
19
        check_name(&root.name)?;
111
19
        let path = self.tree_path(&root.name);
112
19
        if !self.context().file_manager.exists(&path)? {
113
14
            self.context().file_manager.append(&path)?;
114
5
        }
115
19
        let state = self.tree_state(root.clone());
116
19
        Ok(Tree {
117
19
            roots: self.clone(),
118
19
            path,
119
19
            state,
120
19
            vault: root.vault,
121
19
            reducer: root.reducer,
122
19
            name: root.name,
123
19
        })
124
19
    }
125

            
126
24614
    fn tree_path(&self, name: &str) -> PathId {
127
24614
        let mut paths = self.data.tree_paths.lock();
128
24614
        if let Some(id) = paths.get(name) {
129
24595
            id.clone()
130
        } else {
131
19
            let id = self
132
19
                .context()
133
19
                .file_manager
134
19
                .resolve_path(self.path().join(format!("{}.nebari", name)), true)
135
19
                .unwrap();
136
19
            paths.insert(name.to_owned(), id.clone());
137
19
            id
138
        }
139
24614
    }
140

            
141
    /// Removes a tree. Returns true if a tree was deleted.
142
    pub fn delete_tree(&self, name: impl Into<Cow<'static, str>>) -> Result<bool, Error> {
143
        let name = name.into();
144
        let mut tree_states = self.data.tree_states.lock();
145
        self.context()
146
            .file_manager
147
            .delete(self.tree_path(name.as_ref()))?;
148
        Ok(tree_states.remove(name.as_ref()).is_some())
149
    }
150

            
151
    /// Returns a list of all the names of trees contained in this database.
152
    pub fn tree_names(&self) -> Result<Vec<String>, Error> {
153
        let mut names = Vec::new();
154
        // TODO use the file manager
155
        for entry in std::fs::read_dir(self.path())? {
156
            let entry = entry?;
157
            if let Some(name) = entry.file_name().to_str() {
158
                if let Some(without_extension) = name.strip_suffix(".nebari") {
159
                    names.push(without_extension.to_string());
160
                }
161
            }
162
        }
163
        Ok(names)
164
    }
165

            
166
19
    fn tree_state<Root: tree::Root>(&self, root: TreeRoot<Root, File>) -> State<Root> {
167
19
        self.tree_states(&[root])
168
19
            .into_iter()
169
19
            .next()
170
19
            .unwrap()
171
19
            .as_ref()
172
19
            .as_any()
173
19
            .downcast_ref::<State<Root>>()
174
19
            .unwrap()
175
19
            .clone()
176
19
    }
177

            
178
24614
    fn tree_states<R: Borrow<T>, T: AnyTreeRoot<File> + ?Sized>(
179
24614
        &self,
180
24614
        trees: &[R],
181
24614
    ) -> Vec<Box<dyn AnyTreeState>> {
182
24614
        let mut tree_states = self.data.tree_states.lock();
183
24614
        let mut output = Vec::with_capacity(trees.len());
184
49228
        for tree in trees {
185
24614
            let state = tree_states
186
24614
                .entry(tree.borrow().name().to_string())
187
24614
                .or_insert_with(|| tree.borrow().default_state())
188
24614
                .cloned();
189
24614
            output.push(state);
190
24614
        }
191
24614
        output
192
24614
    }
193

            
194
    /// Begins a transaction over `trees`. All trees will be exclusively
195
    /// accessible by the transaction. Dropping the executing transaction will
196
    /// roll the transaction back.
197
    ///
198
    /// ## Errors
199
    ///
200
    /// - [`InvalidTreeName`](ErrorKind::InvalidTreeName): A tree name contained
201
    ///   an invalid character. For a full list of valid characters, see the
202
    ///   documentation on [`InvalidTreeName`](ErrorKind::InvalidTreeName).
203
24595
    pub fn transaction<R: Borrow<T>, T: AnyTreeRoot<File> + ?Sized>(
204
24595
        &self,
205
24595
        trees: &[R],
206
24595
    ) -> Result<ExecutingTransaction<File>, Error> {
207
49190
        for tree in trees {
208
24595
            check_name(tree.borrow().name()).map(|_| tree.borrow().name().as_bytes())?;
209
        }
210
24595
        let transaction = self
211
24595
            .data
212
24595
            .transactions
213
24595
            .new_transaction(trees.iter().map(|t| t.borrow().name().as_bytes()));
214
24595
        let states = self.tree_states(trees);
215
24595
        let trees = trees
216
24595
            .iter()
217
24595
            .zip(states)
218
24595
            .map(|(tree, state)| {
219
24595
                tree.borrow()
220
24595
                    .begin_transaction(
221
24595
                        transaction.id,
222
24595
                        &self.tree_path(tree.borrow().name()),
223
24595
                        state.as_ref(),
224
24595
                        self.context(),
225
24595
                        Some(&self.data.transactions),
226
24595
                    )
227
24595
                    .map(UnlockedTransactionTree::new)
228
24595
            })
229
24595
            .collect::<Result<Vec<_>, Error>>()?;
230
24595
        Ok(ExecutingTransaction {
231
24595
            roots: self.clone(),
232
24595
            transaction: Some(transaction),
233
24595
            trees,
234
24595
        })
235
24595
    }
236
}
237

            
238
24617
fn check_name(name: &str) -> Result<(), Error> {
239
24617
    if name != "_transactions"
240
24616
        && name
241
24616
            .bytes()
242
98548
            .all(|c| matches!(c as char, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '.' | '_'))
243
    {
244
24615
        Ok(())
245
    } else {
246
2
        Err(Error::from(ErrorKind::InvalidTreeName))
247
    }
248
24617
}
249

            
250
impl<File: ManagedFile> Clone for Roots<File> {
251
24646
    fn clone(&self) -> Self {
252
24646
        Self {
253
24646
            data: self.data.clone(),
254
24646
        }
255
24646
    }
256
}
257

            
258
/// An executing transaction. While this exists, no other transactions can
259
/// execute across the same trees as this transaction holds.
260
#[must_use]
261
pub struct ExecutingTransaction<File: ManagedFile> {
262
    roots: Roots<File>,
263
    trees: Vec<UnlockedTransactionTree<File>>,
264
    transaction: Option<ManagedTransaction<File::Manager>>,
265
}
266

            
267
/// A tree that belongs to an [`ExecutingTransaction`].
268
#[must_use]
269
pub struct UnlockedTransactionTree<File: ManagedFile>(Mutex<Box<dyn AnyTransactionTree<File>>>);
270

            
271
impl<File: ManagedFile> UnlockedTransactionTree<File> {
272
24595
    fn new(file: Box<dyn AnyTransactionTree<File>>) -> Self {
273
24595
        Self(Mutex::new(file))
274
24595
    }
275

            
276
    /// Locks this tree so that operations can be performed against it.
277
    ///
278
    /// # Panics
279
    ///
280
    /// This function panics if `Root` does not match the type specified when
281
    /// starting the transaction.
282
24596
    pub fn lock<Root: tree::Root>(&self) -> LockedTransactionTree<'_, Root, File> {
283
24596
        LockedTransactionTree(MutexGuard::map(self.0.lock(), |tree| {
284
24596
            tree.as_mut().as_any_mut().downcast_mut().unwrap()
285
24596
        }))
286
24596
    }
287
}
288

            
289
/// A locked transaction tree. This transactional tree is exclusively available
290
/// for writing and reading to the thread that locks it.
291
#[must_use]
292
pub struct LockedTransactionTree<'transaction, Root: tree::Root, File: ManagedFile>(
293
    MappedMutexGuard<'transaction, TransactionTree<Root, File>>,
294
);
295

            
296
impl<'transaction, Root: tree::Root, File: ManagedFile> Deref
297
    for LockedTransactionTree<'transaction, Root, File>
298
{
299
    type Target = TransactionTree<Root, File>;
300

            
301
    fn deref(&self) -> &Self::Target {
302
        &self.0
303
    }
304
}
305

            
306
impl<'transaction, Root: tree::Root, File: ManagedFile> DerefMut
307
    for LockedTransactionTree<'transaction, Root, File>
308
{
309
24596
    fn deref_mut(&mut self) -> &mut Self::Target {
310
24596
        &mut self.0
311
24596
    }
312
}
313

            
314
impl<File: ManagedFile> ExecutingTransaction<File> {
315
    /// Returns the [`LogEntry`] for this transaction.
316
    #[must_use]
317
    #[allow(clippy::missing_panics_doc)]
318
    pub fn entry(&self) -> &LogEntry<'static> {
319
        self.transaction
320
            .as_ref()
321
            .and_then(|tx| tx.transaction.as_ref())
322
            .unwrap()
323
    }
324

            
325
    /// Returns a mutable reference to the [`LogEntry`] for this transaction.
326
    #[must_use]
327
    #[allow(clippy::missing_panics_doc)]
328
1
    pub fn entry_mut(&mut self) -> &mut LogEntry<'static> {
329
1
        self.transaction
330
1
            .as_mut()
331
1
            .and_then(|tx| tx.transaction.as_mut())
332
1
            .unwrap()
333
1
    }
334

            
335
    /// Commits the transaction. Once this function has returned, all data
336
    /// updates are guaranteed to be able to be accessed by all other readers as
337
    /// well as impervious to sudden failures such as a power outage.
338
    #[allow(clippy::missing_panics_doc)]
339
24592
    pub fn commit(mut self) -> Result<(), Error> {
340
24592
        let trees = std::mem::take(&mut self.trees);
341
        // Write the trees to disk
342
24592
        let trees = self.roots.data.thread_pool.commit_trees(trees)?;
343

            
344
        // Push the transaction to the log.
345
24592
        let transaction = self.transaction.take().unwrap();
346
24592
        let tree_locks = transaction.commit()?;
347

            
348
        // Publish the tree states, now that the transaction has been fully recorded
349
49184
        for tree in trees {
350
24592
            tree.state().publish();
351
24592
        }
352

            
353
        // Release the locks for the trees, allowing a new transaction to begin.
354
24592
        drop(tree_locks);
355
24592

            
356
24592
        Ok(())
357
24592
    }
358

            
359
    /// Rolls the transaction back. It is not necessary to call this function --
360
    /// transactions will automatically be rolled back when the transaction is
361
    /// dropped, if `commit()` isn't called first.
362
2
    pub fn rollback(self) {
363
2
        drop(self);
364
2
    }
365

            
366
    /// Accesses a locked tree.
367
24596
    pub fn tree<Root: tree::Root>(
368
24596
        &self,
369
24596
        index: usize,
370
24596
    ) -> Option<LockedTransactionTree<'_, Root, File>> {
371
24596
        self.unlocked_tree(index).map(UnlockedTransactionTree::lock)
372
24596
    }
373

            
374
    /// Accesses an unlocked tree. Note: If you clone an
375
    /// [`UnlockedTransactionTree`], you must make sure to drop all instances
376
    /// before calling commit.
377
24596
    pub fn unlocked_tree(&self, index: usize) -> Option<&UnlockedTransactionTree<File>> {
378
24596
        self.trees.get(index)
379
24596
    }
380

            
381
3
    fn rollback_tree_states(&mut self) {
382
3
        for tree in self.trees.drain(..) {
383
3
            let tree = tree.0.lock();
384
3
            tree.rollback();
385
3
        }
386
3
    }
387
}
388

            
389
impl<File: ManagedFile> Drop for ExecutingTransaction<File> {
390
    fn drop(&mut self) {
391
24595
        if let Some(transaction) = self.transaction.take() {
392
3
            self.rollback_tree_states();
393
3
            // Now the transaction can be dropped safely, freeing up access to the trees.
394
3
            drop(transaction);
395
24592
        }
396
24595
    }
397
}
398

            
399
/// A tree that is modifiable during a transaction.
400
pub struct TransactionTree<Root: tree::Root, File: ManagedFile> {
401
    pub(crate) transaction_id: TransactionId,
402
    /// The underlying tree file.
403
    pub tree: TreeFile<Root, File>,
404
}
405

            
406
pub trait AnyTransactionTree<File: ManagedFile>: Any + Send + Sync {
407
    fn as_any(&self) -> &dyn Any;
408
    fn as_any_mut(&mut self) -> &mut dyn Any;
409

            
410
    fn state(&self) -> Box<dyn AnyTreeState>;
411

            
412
    fn commit(&mut self) -> Result<(), Error>;
413
    fn rollback(&self);
414
}
415

            
416
impl<Root: tree::Root, File: ManagedFile> AnyTransactionTree<File> for TransactionTree<Root, File> {
417
    fn as_any(&self) -> &dyn Any {
418
        self
419
    }
420
24596
    fn as_any_mut(&mut self) -> &mut dyn Any {
421
24596
        self
422
24596
    }
423

            
424
24592
    fn state(&self) -> Box<dyn AnyTreeState> {
425
24592
        Box::new(self.tree.state.clone())
426
24592
    }
427

            
428
24592
    fn commit(&mut self) -> Result<(), Error> {
429
24592
        self.tree.commit()
430
24592
    }
431

            
432
3
    fn rollback(&self) {
433
3
        let mut state = self.tree.state.lock();
434
3
        state.rollback(&self.tree.state);
435
3
    }
436
}
437

            
438
impl<File: ManagedFile, Index> TransactionTree<VersionedTreeRoot<Index>, File>
439
where
440
    Index: Clone + EmbeddedIndex<ArcBytes<'static>> + Debug + 'static,
441
{
442
    /// Returns the latest sequence id.
443
    #[must_use]
444
    pub fn current_sequence_id(&self) -> SequenceId {
445
        let state = self.tree.state.lock();
446
        state.root.sequence
447
    }
448

            
449
    /// Scans the tree for keys that are contained within `range`. If `forwards`
450
    /// is true, scanning starts at the lowest sort-order key and scans forward.
451
    /// Otherwise, scanning starts at the highest sort-order key and scans
452
    /// backwards. `key_evaluator` is invoked for each key as it is encountered.
453
    /// For all [`ScanEvaluation::ReadData`] results returned, `callback` will be
454
    /// invoked with the key and values. The callback may not be invoked in the
455
    /// same order as the keys are scanned.
456
    pub fn scan_sequences<CallerError, Range, KeyEvaluator, DataCallback>(
457
        &mut self,
458
        range: Range,
459
        forwards: bool,
460
        key_evaluator: &mut KeyEvaluator,
461
        data_callback: &mut DataCallback,
462
    ) -> Result<(), AbortError<CallerError>>
463
    where
464
        Range: RangeBounds<SequenceId> + Debug + 'static,
465
        KeyEvaluator: FnMut(KeySequence<Index>) -> ScanEvaluation,
466
        DataCallback:
467
            FnMut(KeySequence<Index>, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
468
        CallerError: Display + Debug,
469
    {
470
        self.tree
471
            .scan_sequences(range, forwards, true, key_evaluator, data_callback)
472
    }
473

            
474
    /// Retrieves the keys and values associated with one or more `sequences`.
475
    /// The value retrieved is the value of the key at the given [`SequenceId`].
476
    /// If a sequence is not found, it will not appear in the result map. If
477
    /// the value was removed, None is returned for the value.
478
    pub fn get_multiple_by_sequence<Sequences>(
479
        &mut self,
480
        sequences: Sequences,
481
    ) -> Result<HashMap<SequenceId, (ArcBytes<'static>, Option<ArcBytes<'static>>)>, Error>
482
    where
483
        Sequences: Iterator<Item = SequenceId>,
484
    {
485
        self.tree.get_multiple_by_sequence(sequences, true)
486
    }
487

            
488
    /// Retrieves the keys and indexes associated with one or more `sequences`.
489
    /// The value retrieved is the value of the key at the given [`SequenceId`].
490
    /// If a sequence is not found, it will not appear in the result list.
491
    pub fn get_multiple_indexes_by_sequence<Sequences>(
492
        &mut self,
493
        sequences: Sequences,
494
    ) -> Result<Vec<SequenceIndex<Index>>, Error>
495
    where
496
        Sequences: Iterator<Item = SequenceId>,
497
    {
498
        self.tree.get_multiple_indexes_by_sequence(sequences, true)
499
    }
500

            
501
    /// Retrieves the keys, values, and indexes associated with one or more
502
    /// `sequences`. The value retrieved is the value of the key at the given
503
    /// [`SequenceId`]. If a sequence is not found, it will not appear in the
504
    /// result list.
505
    pub fn get_multiple_with_indexes_by_sequence<Sequences>(
506
        &mut self,
507
        sequences: Sequences,
508
    ) -> Result<HashMap<SequenceId, SequenceEntry<Index>>, Error>
509
    where
510
        Sequences: Iterator<Item = SequenceId>,
511
    {
512
        self.tree
513
            .get_multiple_with_indexes_by_sequence(sequences, true)
514
    }
515
}
516

            
517
impl<Root: tree::Root, File: ManagedFile> TransactionTree<Root, File> {
518
    /// Sets `key` to `value`. Returns the newly created index for this key.
519
16402
    pub fn set(
520
16402
        &mut self,
521
16402
        key: impl Into<ArcBytes<'static>>,
522
16402
        value: impl Into<Root::Value>,
523
16402
    ) -> Result<Root::Index, Error> {
524
16402
        self.tree.set(
525
16402
            PersistenceMode::Transactional(self.transaction_id),
526
16402
            key,
527
16402
            value,
528
16402
        )
529
16402
    }
530

            
531
    /// Executes a modification. Returns a list of all changed keys.
532
    pub fn modify<'a>(
533
        &mut self,
534
        keys: Vec<ArcBytes<'a>>,
535
        operation: Operation<'a, Root::Value, Root::Index>,
536
    ) -> Result<Vec<ModificationResult<Root::Index>>, Error> {
537
        self.tree.modify(Modification {
538
            keys,
539
            persistence_mode: PersistenceMode::Transactional(self.transaction_id),
540
            operation,
541
        })
542
    }
543

            
544
    /// Sets `key` to `value`. Returns a tuple containing two elements:
545
    ///
546
    /// - The previously stored value, if a value was already present.
547
    /// - The new/updated index for this key.
548
    pub fn replace(
549
        &mut self,
550
        key: impl Into<ArcBytes<'static>>,
551
        value: impl Into<Root::Value>,
552
    ) -> Result<(Option<Root::Value>, Root::Index), Error> {
553
        self.tree.replace(key, value, self.transaction_id)
554
    }
555

            
556
    /// Returns the current value of `key`. This will return updated information
557
    /// if it has been previously updated within this transaction.
558
2
    pub fn get(&mut self, key: &[u8]) -> Result<Option<Root::Value>, Error> {
559
2
        self.tree.get(key, true)
560
2
    }
561

            
562
    /// Returns the current index of `key`. This will return updated information
563
    /// if it has been previously updated within this transaction.
564
    pub fn get_index(&mut self, key: &[u8]) -> Result<Option<Root::Index>, Error> {
565
        self.tree.get_index(key, true)
566
    }
567

            
568
    /// Returns the current value and index of `key`. This will return updated
569
    /// information if it has been previously updated within this transaction.
570
    pub fn get_with_index(&mut self, key: &[u8]) -> Result<Option<TreeValueIndex<Root>>, Error> {
571
        self.tree.get_with_index(key, true)
572
    }
573

            
574
    /// Removes `key` and returns the existing value amd index, if present.
575
8192
    pub fn remove(&mut self, key: &[u8]) -> Result<Option<TreeValueIndex<Root>>, Error> {
576
8192
        self.tree.remove(key, self.transaction_id)
577
8192
    }
578

            
579
    /// Compares the value of `key` against `old`. If the values match, key will
580
    /// be set to the new value if `new` is `Some` or removed if `new` is
581
    /// `None`.
582
    pub fn compare_and_swap<Old>(
583
        &mut self,
584
        key: &[u8],
585
        old: Option<&Old>,
586
        new: Option<Root::Value>,
587
    ) -> Result<(), CompareAndSwapError<Root::Value>>
588
    where
589
        Old: PartialEq,
590
        Root::Value: AsRef<Old> + Clone,
591
    {
592
        self.tree
593
            .compare_and_swap(key, old, new, self.transaction_id)
594
    }
595

            
596
    /// Retrieves the values of `keys`. If any keys are not found, they will be
597
    /// omitted from the results. Keys are required to be pre-sorted.
598
    pub fn get_multiple<'keys, KeysIntoIter, KeysIter>(
599
        &mut self,
600
        keys: KeysIntoIter,
601
    ) -> Result<Vec<(ArcBytes<'static>, Root::Value)>, Error>
602
    where
603
        KeysIntoIter: IntoIterator<Item = &'keys [u8], IntoIter = KeysIter>,
604
        KeysIter: Iterator<Item = &'keys [u8]> + ExactSizeIterator,
605
    {
606
        self.tree.get_multiple(keys, true)
607
    }
608

            
609
    /// Retrieves the indexes of `keys`. If any keys are not found, they will be
610
    /// omitted from the results. Keys are required to be pre-sorted.
611
    pub fn get_multiple_indexes<'keys, KeysIntoIter, KeysIter>(
612
        &mut self,
613
        keys: KeysIntoIter,
614
    ) -> Result<Vec<(ArcBytes<'static>, Root::Index)>, Error>
615
    where
616
        KeysIntoIter: IntoIterator<Item = &'keys [u8], IntoIter = KeysIter>,
617
        KeysIter: Iterator<Item = &'keys [u8]> + ExactSizeIterator,
618
    {
619
        self.tree.get_multiple_indexes(keys, true)
620
    }
621

            
622
    /// Retrieves the values and indexes of `keys`. If any keys are not found,
623
    /// they will be omitted from the results. Keys are required to be
624
    /// pre-sorted.
625
    pub fn get_multiple_with_indexes<'keys, KeysIntoIter, KeysIter>(
626
        &mut self,
627
        keys: KeysIntoIter,
628
    ) -> Result<Vec<TreeEntry<Root>>, Error>
629
    where
630
        KeysIntoIter: IntoIterator<Item = &'keys [u8], IntoIter = KeysIter>,
631
        KeysIter: Iterator<Item = &'keys [u8]> + ExactSizeIterator,
632
    {
633
        self.tree.get_multiple_with_indexes(keys, true)
634
    }
635

            
636
    /// Retrieves all of the values of keys within `range`.
637
    pub fn get_range<'keys, KeyRangeBounds>(
638
        &mut self,
639
        range: &'keys KeyRangeBounds,
640
    ) -> Result<Vec<(ArcBytes<'static>, Root::Value)>, Error>
641
    where
642
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
643
    {
644
        self.tree.get_range(range, true)
645
    }
646

            
647
    /// Retrieves all of the indexes of keys within `range`.
648
    pub fn get_range_indexes<'keys, KeyRangeBounds>(
649
        &mut self,
650
        range: &'keys KeyRangeBounds,
651
    ) -> Result<Vec<(ArcBytes<'static>, Root::Index)>, Error>
652
    where
653
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
654
    {
655
        self.tree.get_range_indexes(range, true)
656
    }
657

            
658
    /// Retrieves all of the values and indexes of keys within `range`.
659
    pub fn get_range_with_indexes<'keys, KeyRangeBounds>(
660
        &mut self,
661
        range: &'keys KeyRangeBounds,
662
    ) -> Result<Vec<TreeEntry<Root>>, Error>
663
    where
664
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
665
    {
666
        self.tree.get_range_with_indexes(range, true)
667
    }
668

            
669
    /// Scans the tree across all nodes that might contain nodes within `range`.
670
    ///
671
    /// If `forwards` is true, the tree is scanned in ascending order.
672
    /// Otherwise, the tree is scanned in descending order.
673
    ///
674
    /// `node_evaluator` is invoked for each [`Interior`](crate::tree::Interior)
675
    /// node to determine if the node should be traversed. The parameters to the
676
    /// callback are:
677
    ///
678
    /// - `&ArcBytes<'static>`: The maximum key stored within the all children
679
    ///   nodes.
680
    /// - `&Root::ReducedIndex`: The reduced index value stored within the node.
681
    /// - `usize`: The depth of the node. The root nodes are depth 0.
682
    ///
683
    /// The result of the callback is a [`ScanEvaluation`]. To read children
684
    /// nodes, return [`ScanEvaluation::ReadData`].
685
    ///
686
    /// `key_evaluator` is invoked for each key encountered that is contained
687
    /// within `range`. For all [`ScanEvaluation::ReadData`] results returned,
688
    /// `callback` will be invoked with the key and values. `callback` may not
689
    /// be invoked in the same order as the keys are scanned.
690
    #[cfg_attr(
691
        feature = "tracing",
692
        tracing::instrument(skip(self, node_evaluator, key_evaluator, callback))
693
    )]
694
    pub fn scan<'b, 'keys, CallerError, KeyRangeBounds, NodeEvaluator, KeyEvaluator, DataCallback>(
695
        &mut self,
696
        range: &'keys KeyRangeBounds,
697
        forwards: bool,
698
        mut node_evaluator: NodeEvaluator,
699
        mut key_evaluator: KeyEvaluator,
700
        mut callback: DataCallback,
701
    ) -> Result<(), AbortError<CallerError>>
702
    where
703
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
704
        NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation,
705
        KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
706
        DataCallback: FnMut(
707
            ArcBytes<'static>,
708
            &Root::Index,
709
            Root::Value,
710
        ) -> Result<(), AbortError<CallerError>>,
711
        CallerError: Display + Debug,
712
    {
713
        self.tree.scan(
714
            range,
715
            forwards,
716
            true,
717
            &mut node_evaluator,
718
            &mut key_evaluator,
719
            &mut callback,
720
        )
721
    }
722

            
723
    /// Returns the reduced index over the provided range. This is an
724
    /// aggregation function that builds atop the `scan()` operation which calls
725
    /// [`Reducer::reduce()`](crate::tree::Reducer::reduce) and
726
    /// [`Reducer::rereduce()`](crate::tree::Reducer::rereduce) on all matching
727
    /// indexes stored within the nodes of this tree, producing a single
728
    /// aggregated [`Root::ReducedIndex`](tree::Root::ReducedIndex) value.
729
    ///
730
    /// If no keys match, the returned result is what
731
    /// [`Reducer::rereduce()`](crate::tree::Reducer::rereduce) returns when an
732
    /// empty slice is provided.
733
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
734
    pub fn reduce<'keys, KeyRangeBounds>(
735
        &mut self,
736
        range: &'keys KeyRangeBounds,
737
    ) -> Result<Option<Root::ReducedIndex>, Error>
738
    where
739
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
740
    {
741
        self.tree.reduce(range, true)
742
    }
743

            
744
    /// Returns the first key of the tree.
745
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
746
    pub fn first_key(&mut self) -> Result<Option<ArcBytes<'static>>, Error> {
747
        self.tree.first_key(true)
748
    }
749

            
750
    /// Returns the first key and value of the tree.
751
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
752
    pub fn first(&mut self) -> Result<Option<(ArcBytes<'static>, Root::Value)>, Error> {
753
        self.tree.first(true)
754
    }
755

            
756
    /// Returns the last key of the tree.
757
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
758
    pub fn last_key(&mut self) -> Result<Option<ArcBytes<'static>>, Error> {
759
        self.tree.last_key(true)
760
    }
761

            
762
    /// Returns the last key and value of the tree.
763
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
764
    pub fn last(&mut self) -> Result<Option<(ArcBytes<'static>, Root::Value)>, Error> {
765
        self.tree.last(true)
766
    }
767
}
768

            
769
/// An error returned from `compare_and_swap()`.
770
#[derive(Debug, thiserror::Error)]
771
pub enum CompareAndSwapError<Value: Debug> {
772
    /// The stored value did not match the conditional value.
773
    #[error("value did not match. existing value: {0:?}")]
774
    Conflict(Option<Value>),
775
    /// Another error occurred while executing the operation.
776
    #[error("error during compare_and_swap: {0}")]
777
    Error(#[from] Error),
778
}
779

            
780
/// A database configuration used to open a database.
781
#[derive(Debug)]
782
#[must_use]
783
pub struct Config<M: FileManager = StdFileManager> {
784
    path: PathBuf,
785
    vault: Option<Arc<dyn AnyVault>>,
786
    cache: Option<ChunkCache>,
787
    file_manager: Option<M>,
788
    thread_pool: Option<ThreadPool<M::File>>,
789
}
790

            
791
impl<M: FileManager> Clone for Config<M> {
792
1
    fn clone(&self) -> Self {
793
1
        Self {
794
1
            path: self.path.clone(),
795
1
            vault: self.vault.clone(),
796
1
            cache: self.cache.clone(),
797
1
            file_manager: self.file_manager.clone(),
798
1
            thread_pool: self.thread_pool.clone(),
799
1
        }
800
1
    }
801
}
802

            
803
impl Config<StdFileManager> {
804
    /// Creates a new config to open a database located at `path`.
805
16
    pub fn new<P: AsRef<Path>>(path: P) -> Self {
806
16
        Self {
807
16
            path: path.as_ref().to_path_buf(),
808
16
            vault: None,
809
16
            cache: None,
810
16
            thread_pool: None,
811
16
            file_manager: None,
812
16
        }
813
16
    }
814

            
815
    /// Returns a default configuration to open a database located at `path`.
816
    pub fn default_for<P: AsRef<Path>>(path: P) -> Self {
817
        Self {
818
            path: path.as_ref().to_path_buf(),
819
            vault: None,
820
            cache: Some(ChunkCache::new(2000, 65536)),
821
            thread_pool: Some(ThreadPool::default()),
822
            file_manager: None,
823
        }
824
    }
825

            
826
    /// Sets the file manager.
827
    ///
828
    /// ## Panics
829
    ///
830
    /// Panics if called after a shared thread pool has been set.
831
10
    pub fn file_manager<M: FileManager>(self, file_manager: M) -> Config<M> {
832
10
        assert!(self.thread_pool.is_none());
833
10
        Config {
834
10
            path: self.path,
835
10
            vault: self.vault,
836
10
            cache: self.cache,
837
10
            file_manager: Some(file_manager),
838
10
            thread_pool: None,
839
10
        }
840
10
    }
841
}
842

            
843
impl<M: FileManager> Config<M> {
844
    /// Sets the vault to use for this database.
845
2
    pub fn vault<V: AnyVault>(mut self, vault: V) -> Self {
846
2
        self.vault = Some(Arc::new(vault));
847
2
        self
848
2
    }
849

            
850
    /// Sets the chunk cache to use for this database.
851
    pub fn cache(mut self, cache: ChunkCache) -> Self {
852
        self.cache = Some(cache);
853
        self
854
    }
855

            
856
    /// Uses the `thread_pool` provided instead of creating its own. This will
857
    /// allow a single thread pool to manage multiple [`Roots`] instances'
858
    /// transactions.
859
    pub fn shared_thread_pool(mut self, thread_pool: &ThreadPool<M::File>) -> Self {
860
        self.thread_pool = Some(thread_pool.clone());
861
        self
862
    }
863

            
864
    /// Opens the database, or creates one if the target path doesn't exist.
865
17
    pub fn open(self) -> Result<Roots<M::File>, Error> {
866
17
        Roots::open(
867
17
            self.path,
868
17
            Context {
869
17
                file_manager: self.file_manager.unwrap_or_default(),
870
17
                vault: self.vault,
871
17
                cache: self.cache,
872
17
            },
873
17
            self.thread_pool.unwrap_or_default(),
874
17
        )
875
17
    }
876
}
877

            
878
/// A named collection of keys and values.
879
pub struct Tree<Root: tree::Root, File: ManagedFile> {
880
    roots: Roots<File>,
881
    path: PathId,
882
    state: State<Root>,
883
    reducer: Arc<dyn AnyReducer>,
884
    vault: Option<Arc<dyn AnyVault>>,
885
    name: Cow<'static, str>,
886
}
887

            
888
impl<Root: tree::Root, File: ManagedFile> Clone for Tree<Root, File> {
889
32
    fn clone(&self) -> Self {
890
32
        Self {
891
32
            roots: self.roots.clone(),
892
32
            path: self.path.clone(),
893
32
            state: self.state.clone(),
894
32
            vault: self.vault.clone(),
895
32
            reducer: self.reducer.clone(),
896
32
            name: self.name.clone(),
897
32
        }
898
32
    }
899
}
900

            
901
impl<Root: tree::Root, File: ManagedFile> Tree<Root, File> {
902
    /// Returns the name of the tree.
903
    #[must_use]
904
    pub fn name(&self) -> &str {
905
        &self.name
906
    }
907

            
908
    /// Returns the path to the file for this tree.
909
    #[must_use]
910
    pub fn path(&self) -> &Path {
911
        self.path.path()
912
    }
913

            
914
    /// Returns the number of keys stored in the tree. Does not include deleted keys.
915
    #[must_use]
916
3506
    pub fn count(&self) -> u64 {
917
3506
        let state = self.state.lock();
918
3506
        state.root.count()
919
3506
    }
920

            
921
    /// Sets `key` to `value`. This is executed within its own transaction.
922
    #[allow(clippy::missing_panics_doc)]
923
16398
    pub fn set(
924
16398
        &self,
925
16398
        key: impl Into<ArcBytes<'static>>,
926
16398
        value: impl Into<Root::Value>,
927
16398
    ) -> Result<(), Error> {
928
16398
        let transaction = self.begin_transaction()?;
929
16398
        transaction.tree::<Root>(0).unwrap().set(key, value)?;
930
16398
        transaction.commit()
931
16398
    }
932

            
933
24590
    fn begin_transaction(&self) -> Result<ExecutingTransaction<File>, Error> {
934
24590
        let reducer = self
935
24590
            .reducer
936
24590
            .as_ref()
937
24590
            .as_any()
938
24590
            .downcast_ref::<Root::Reducer>()
939
24590
            .unwrap()
940
24590
            .clone();
941
24590
        let mut root = Root::tree_with_reducer(self.name.clone(), reducer);
942
24590
        if let Some(vault) = &self.vault {
943
1
            root.vault = Some(vault.clone());
944
24589
        }
945
24590
        self.roots.transaction(&[root])
946
24590
    }
947

            
948
    /// Returns a [`TreeFile`] for lower-level operations within the context of
949
    /// Roots.
950
    ///
951
    /// Using this direct access, it is possible to circumvent some of the
952
    /// safety provided by Roots (e.g., passing an incorrect value for
953
    /// `in_transaction`). This function is provided for those who are
954
    /// implementing custom roots and wish to expose functionality through
955
    /// Roots.
956
11700
    pub fn open_for_read(&self) -> Result<TreeFile<Root, File>, Error> {
957
11700
        let context = self.vault.as_ref().map_or_else(
958
11700
            || Cow::Borrowed(self.roots.context()),
959
11700
            |vault| Cow::Owned(self.roots.context().clone().with_any_vault(vault.clone())),
960
11700
        );
961
11700

            
962
11700
        TreeFile::<Root, File>::read(
963
11700
            &self.path,
964
11700
            self.state.clone(),
965
11700
            &context,
966
11700
            Some(self.roots.transactions()),
967
11700
        )
968
11700
    }
969

            
970
    /// Returns a [`TreeFile`] for lower-level operations within the context of
971
    /// Roots.
972
    ///
973
    /// Using this direct access, it is possible to circumvent some of the
974
    /// safety provided by Roots (e.g., passing an incorrect value for
975
    /// `in_transaction`). This function is provided for those who are
976
    /// implementing custom roots and wish to expose functionality through
977
    /// Roots.
978
    pub fn open_for_write(&self) -> Result<TreeFile<Root, File>, Error> {
979
        let context = self.vault.as_ref().map_or_else(
980
            || Cow::Borrowed(self.roots.context()),
981
            |vault| Cow::Owned(self.roots.context().clone().with_any_vault(vault.clone())),
982
        );
983

            
984
        TreeFile::<Root, File>::write(
985
            &self.path,
986
            self.state.clone(),
987
            &context,
988
            Some(self.roots.transactions()),
989
        )
990
    }
991

            
992
    /// Retrieves the current value of `key`, if present. Does not reflect any
993
    /// changes in pending transactions.
994
8202
    pub fn get(&self, key: &[u8]) -> Result<Option<Root::Value>, Error> {
995
8202
        catch_compaction_and_retry(|| {
996
8202
            let mut tree = match self.open_for_read() {
997
8200
                Ok(tree) => tree,
998
2
                Err(err) if err.kind.is_file_not_found() => return Ok(None),
999
2
                Err(err) => return Err(err),
            };

            
8200
            tree.get(key, false)
8202
        })
8202
    }

            
    /// Retrieves the current index of `key`, if present. Does not reflect any
    /// changes in pending transactions.
    pub fn get_index(&self, key: &[u8]) -> Result<Option<Root::Index>, Error> {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(None),
                Err(err) => return Err(err),
            };

            
            tree.get_index(key, false)
        })
    }

            
    /// Retrieves the current value and index of `key`, if present. Does not reflect any
    /// changes in pending transactions.
    pub fn get_with_index(&self, key: &[u8]) -> Result<Option<TreeValueIndex<Root>>, Error> {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(None),
                Err(err) => return Err(err),
            };

            
            tree.get_with_index(key, false)
        })
    }

            
    /// Sets `key` to `value`. Returns a tuple containing two elements:
    ///
    /// - The previously stored value, if a value was already present.
    /// - The new/updated index for this key.
    #[allow(clippy::missing_panics_doc)]
    pub fn replace(
        &self,
        key: impl Into<ArcBytes<'static>>,
        value: impl Into<Root::Value>,
    ) -> Result<(Option<Root::Value>, Root::Index), Error> {
        let transaction = self.begin_transaction()?;
        let existing_value = transaction.tree::<Root>(0).unwrap().replace(key, value)?;
        transaction.commit()?;
        Ok(existing_value)
    }

            
    /// Executes a modification. Returns a list of all changed keys.
    #[allow(clippy::missing_panics_doc)]
    pub fn modify<'a>(
        &self,
        keys: Vec<ArcBytes<'a>>,
        operation: Operation<'a, Root::Value, Root::Index>,
    ) -> Result<Vec<ModificationResult<Root::Index>>, Error> {
        let transaction = self.begin_transaction()?;
        let results = transaction
            .tree::<Root>(0)
            .unwrap()
            .modify(keys, operation)?;
        transaction.commit()?;
        Ok(results)
    }

            
    /// Removes `key` and returns the existing value and index, if present. This
    /// is executed within its own transaction.
    #[allow(clippy::missing_panics_doc)]
8192
    pub fn remove(&self, key: &[u8]) -> Result<Option<TreeValueIndex<Root>>, Error> {
8192
        let transaction = self.begin_transaction()?;
8192
        let existing_value = transaction.tree::<Root>(0).unwrap().remove(key)?;
8192
        transaction.commit()?;
8192
        Ok(existing_value)
8192
    }

            
    /// Compares the value of `key` against `old`. If the values match, key will
    /// be set to the new value if `new` is `Some` or removed if `new` is
    /// `None`. This is executed within its own transaction.
    #[allow(clippy::missing_panics_doc)]
    pub fn compare_and_swap<Old>(
        &self,
        key: &[u8],
        old: Option<&Old>,
        new: Option<Root::Value>,
    ) -> Result<(), CompareAndSwapError<Root::Value>>
    where
        Old: PartialEq,
        Root::Value: AsRef<Old> + Clone,
    {
        let transaction = self.begin_transaction()?;
        transaction
            .tree::<Root>(0)
            .unwrap()
            .compare_and_swap(key, old, new)?;
        transaction.commit()?;
        Ok(())
    }

            
    /// Retrieves the values of `keys`. If any keys are not found, they will be
    /// omitted from the results. Keys are required to be pre-sorted.
    #[allow(clippy::needless_pass_by_value)]
    pub fn get_multiple<'keys, Keys>(
        &self,
        keys: Keys,
    ) -> Result<Vec<(ArcBytes<'static>, Root::Value)>, Error>
    where
        Keys: Iterator<Item = &'keys [u8]> + ExactSizeIterator + Clone,
    {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(Vec::new()),
                Err(err) => return Err(err),
            };

            
            tree.get_multiple(keys.clone(), false)
        })
    }

            
    /// Retrieves the indexes of `keys`. If any keys are not found, they will be
    /// omitted from the results. Keys are required to be pre-sorted.
    #[allow(clippy::needless_pass_by_value)]
    pub fn get_multiple_indexes<'keys, KeysIntoIter, KeysIter>(
        &self,
        keys: KeysIntoIter,
    ) -> Result<Vec<(ArcBytes<'static>, Root::Index)>, Error>
    where
        KeysIntoIter: IntoIterator<Item = &'keys [u8], IntoIter = KeysIter> + Clone,
        KeysIter: Iterator<Item = &'keys [u8]> + ExactSizeIterator,
    {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(Vec::new()),
                Err(err) => return Err(err),
            };

            
            tree.get_multiple_indexes(keys.clone(), false)
        })
    }

            
    /// Retrieves the values and indexes of `keys`. If any keys are not found,
    /// they will be omitted from the results. Keys are required to be
    /// pre-sorted.
    #[allow(clippy::needless_pass_by_value)]
    pub fn get_multiple_with_indexes<'keys, KeysIntoIter, KeysIter>(
        &self,
        keys: KeysIntoIter,
    ) -> Result<Vec<TreeEntry<Root>>, Error>
    where
        KeysIntoIter: IntoIterator<Item = &'keys [u8], IntoIter = KeysIter> + Clone,
        KeysIter: Iterator<Item = &'keys [u8]> + ExactSizeIterator,
    {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(Vec::new()),
                Err(err) => return Err(err),
            };

            
            tree.get_multiple_with_indexes(keys.clone(), false)
        })
    }

            
    /// Retrieves all of the values of keys within `range`.
    pub fn get_range<'keys, KeyRangeBounds>(
        &self,
        range: &'keys KeyRangeBounds,
    ) -> Result<Vec<(ArcBytes<'static>, Root::Value)>, Error>
    where
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
    {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(Vec::new()),
                Err(err) => return Err(err),
            };

            
            tree.get_range(range, false)
        })
    }

            
    /// Retrieves all of the indexes of keys within `range`.
    pub fn get_range_indexes<'keys, KeyRangeBounds>(
        &self,
        range: &'keys KeyRangeBounds,
    ) -> Result<Vec<(ArcBytes<'static>, Root::Index)>, Error>
    where
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
    {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(Vec::new()),
                Err(err) => return Err(err),
            };

            
            tree.get_range_indexes(range, false)
        })
    }

            
    /// Retrieves all of the values and indexes of keys within `range`.
    pub fn get_range_with_indexes<'keys, KeyRangeBounds>(
        &self,
        range: &'keys KeyRangeBounds,
    ) -> Result<Vec<TreeEntry<Root>>, Error>
    where
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
    {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(Vec::new()),
                Err(err) => return Err(err),
            };

            
            tree.get_range_with_indexes(range, false)
        })
    }

            
    /// Scans the tree across all nodes that might contain nodes within `range`.
    ///
    /// If `forwards` is true, the tree is scanned in ascending order.
    /// Otherwise, the tree is scanned in descending order.
    ///
    /// `node_evaluator` is invoked for each [`Interior`](crate::tree::Interior) node to determine if
    /// the node should be traversed. The parameters to the callback are:
    ///
    /// - `&ArcBytes<'static>`: The maximum key stored within the all children
    ///   nodes.
    /// - `&Root::ReducedIndex`: The reduced index value stored within the node.
    /// - `usize`: The depth of the node. The root nodes are depth 0.
    ///
    /// The result of the callback is a [`ScanEvaluation`]. To read children
    /// nodes, return [`ScanEvaluation::ReadData`].
    ///
    /// `key_evaluator` is invoked for each key encountered that is contained
    /// within `range`. For all [`ScanEvaluation::ReadData`] results returned,
    /// `callback` will be invoked with the key and values. `callback` may not
    /// be invoked in the same order as the keys are scanned.
    #[cfg_attr(
        feature = "tracing",
        tracing::instrument(skip(self, node_evaluator, key_evaluator, callback))
    )]
    pub fn scan<'keys, CallerError, KeyRangeBounds, NodeEvaluator, KeyEvaluator, DataCallback>(
        &self,
        range: &'keys KeyRangeBounds,
        forwards: bool,
        mut node_evaluator: NodeEvaluator,
        mut key_evaluator: KeyEvaluator,
        mut callback: DataCallback,
    ) -> Result<(), AbortError<CallerError>>
    where
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
        NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation,
        KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
        DataCallback: FnMut(
            ArcBytes<'static>,
            &Root::Index,
            Root::Value,
        ) -> Result<(), AbortError<CallerError>>,
        CallerError: Display + Debug,
    {
        catch_compaction_and_retry_abortable(move || {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(()),
                Err(err) => return Err(AbortError::from(err)),
            };

            
            tree.scan(
                range,
                forwards,
                false,
                &mut node_evaluator,
                &mut key_evaluator,
                &mut callback,
            )
        })
    }

            
    /// Returns the reduced index over the provided range. This is an
    /// aggregation function that builds atop the `scan()` operation which calls
    /// [`Reducer::reduce()`](crate::tree::Reducer::reduce) and
    /// [`Reducer::rereduce()`](crate::tree::Reducer::rereduce) on all matching
    /// indexes stored within the nodes of this tree, producing a single
    /// aggregated [`Root::ReducedIndex`](tree::Root::ReducedIndex) value.
    ///
    /// If no keys match, the returned result is what
    /// [`Reducer::rereduce()`](crate::tree::Reducer::rereduce) returns when an
    /// empty slice is provided.
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
    pub fn reduce<'keys, KeyRangeBounds>(
        &self,
        range: &'keys KeyRangeBounds,
    ) -> Result<Option<Root::ReducedIndex>, Error>
    where
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
    {
        catch_compaction_and_retry(move || {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(None),
                Err(err) => return Err(err),
            };

            
            tree.reduce(range, false)
        })
    }

            
    /// Returns the first key of the tree.
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
    pub fn first_key(&self) -> Result<Option<ArcBytes<'static>>, Error> {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(None),
                Err(err) => return Err(err),
            };

            
            tree.first_key(false)
        })
    }

            
    /// Returns the first key and value of the tree.
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
    pub fn first(&self) -> Result<Option<(ArcBytes<'static>, Root::Value)>, Error> {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(None),
                Err(err) => return Err(err),
            };

            
            tree.first(false)
        })
    }

            
    /// Returns the last key of the tree.
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
    pub fn last_key(&self) -> Result<Option<ArcBytes<'static>>, Error> {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(None),
                Err(err) => return Err(err),
            };

            
            tree.last_key(false)
        })
    }

            
    /// Returns the last key and value of the tree.
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
    pub fn last(&self) -> Result<Option<(ArcBytes<'static>, Root::Value)>, Error> {
        catch_compaction_and_retry(|| {
            let mut tree = match self.open_for_read() {
                Ok(tree) => tree,
                Err(err) if err.kind.is_file_not_found() => return Ok(None),
                Err(err) => return Err(err),
            };

            
            tree.last(false)
        })
    }

            
    /// Rewrites the database to remove data that is no longer current. Because
    /// Nebari uses an append-only format, this is helpful in reducing disk
    /// usage.
    ///
    /// See [`TreeFile::compact()`](crate::tree::TreeFile::compact) for more
    /// information.
3498
    pub fn compact(&self) -> Result<(), Error> {
3498
        let tree = match self.open_for_read() {
3498
            Ok(tree) => tree,
            Err(err) if err.kind.is_file_not_found() => return Ok(()),
            Err(err) => return Err(err),
        };
3498
        tree.compact(
3498
            &self.roots.context().file_manager,
3498
            Some(TransactableCompaction {
3498
                name: self.name.as_ref(),
3498
                manager: self.roots.transactions(),
3498
            }),
3498
        )?;
3498
        Ok(())
3498
    }
}

            
impl<Root: tree::Root, File: ManagedFile> AnyTreeRoot<File> for Tree<Root, File> {
    fn name(&self) -> &str {
        &self.name
    }

            
    fn default_state(&self) -> Box<dyn AnyTreeState> {
        Box::new(State::<Root>::new(
            None,
            None,
            Root::default_with(
                self.reducer
                    .as_ref()
                    .as_any()
                    .downcast_ref::<Root::Reducer>()
                    .unwrap()
                    .clone(),
            ),
        ))
    }

            
    fn begin_transaction(
        &self,
        transaction_id: TransactionId,
        file_path: &PathId,
        state: &dyn AnyTreeState,
        context: &Context<File::Manager>,
        transactions: Option<&TransactionManager<File::Manager>>,
    ) -> Result<Box<dyn AnyTransactionTree<File>>, Error> {
        let context = self.vault.as_ref().map_or_else(
            || Cow::Borrowed(context),
            |vault| Cow::Owned(context.clone().with_any_vault(vault.clone())),
        );
        let tree = TreeFile::write(
            file_path,
            state
                .as_any()
                .downcast_ref::<State<Root>>()
                .unwrap()
                .clone(),
            &context,
            transactions,
        )?;

            
        Ok(Box::new(TransactionTree {
            transaction_id,
            tree,
        }))
    }
}

            
impl<File: ManagedFile, Index> Tree<VersionedTreeRoot<Index>, File>
where
    Index: EmbeddedIndex<ArcBytes<'static>> + Clone + Debug + 'static,
{
    /// Returns the latest sequence id.
    #[must_use]
    pub fn current_sequence_id(&self) -> SequenceId {
        let state = self.state.lock();
        state.root.sequence
    }

            
    /// Scans the tree for keys that are contained within `range`. If `forwards`
    /// is true, scanning starts at the lowest sort-order key and scans forward.
    /// Otherwise, scanning starts at the highest sort-order key and scans
    /// backwards. `key_evaluator` is invoked for each key as it is encountered.
    /// For all [`ScanEvaluation::ReadData`] results returned, `callback` will be
    /// invoked with the key and values. The callback may not be invoked in the
    /// same order as the keys are scanned.
    pub fn scan_sequences<CallerError, Range, KeyEvaluator, DataCallback>(
        &self,
        range: Range,
        forwards: bool,
        mut key_evaluator: KeyEvaluator,
        mut data_callback: DataCallback,
    ) -> Result<(), AbortError<CallerError>>
    where
        Range: Clone + RangeBounds<SequenceId> + Debug + 'static,
        KeyEvaluator: FnMut(KeySequence<Index>) -> ScanEvaluation,
        DataCallback:
            FnMut(KeySequence<Index>, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
        CallerError: Display + Debug,
    {
        catch_compaction_and_retry_abortable(|| {
            let mut tree = TreeFile::<VersionedTreeRoot<Index>, File>::read(
                &self.path,
                self.state.clone(),
                self.roots.context(),
                Some(self.roots.transactions()),
            )?;

            
            tree.scan_sequences(
                range.clone(),
                forwards,
                false,
                &mut key_evaluator,
                &mut data_callback,
            )
        })
    }

            
    /// Retrieves the keys and values associated with one or more `sequences`.
    /// The value retrieved is the value of the key at the given [`SequenceId`].
    /// If a sequence is not found, it will not appear in the result map. If
    /// the value was removed, None is returned for the value.
    #[allow(clippy::needless_pass_by_value)]
    pub fn get_multiple_by_sequence<Sequences>(
        &self,
        sequences: Sequences,
    ) -> Result<HashMap<SequenceId, (ArcBytes<'static>, Option<ArcBytes<'static>>)>, Error>
    where
        Sequences: Iterator<Item = SequenceId> + Clone,
    {
        catch_compaction_and_retry(|| {
            let mut tree = TreeFile::<VersionedTreeRoot<Index>, File>::read(
                &self.path,
                self.state.clone(),
                self.roots.context(),
                Some(self.roots.transactions()),
            )?;

            
            tree.get_multiple_by_sequence(sequences.clone(), false)
        })
    }

            
    /// Retrieves the keys and indexes associated with one or more `sequences`.
    /// The value retrieved is the value of the key at the given [`SequenceId`].
    /// If a sequence is not found, it will not appear in the result list.
    #[allow(clippy::needless_pass_by_value)]
    pub fn get_multiple_indexes_by_sequence<Sequences>(
        &self,
        sequences: Sequences,
    ) -> Result<Vec<SequenceIndex<Index>>, Error>
    where
        Sequences: Iterator<Item = SequenceId> + Clone,
    {
        catch_compaction_and_retry(|| {
            let mut tree = TreeFile::<VersionedTreeRoot<Index>, File>::read(
                &self.path,
                self.state.clone(),
                self.roots.context(),
                Some(self.roots.transactions()),
            )?;

            
            tree.get_multiple_indexes_by_sequence(sequences.clone(), false)
        })
    }

            
    /// Retrieves the keys, values, and indexes associated with one or more
    /// `sequences`. The value retrieved is the value of the key at the given
    /// [`SequenceId`]. If a sequence is not found, it will not appear in the
    /// result list.
    #[allow(clippy::needless_pass_by_value)]
    pub fn get_multiple_with_indexes_by_sequence<Sequences>(
        &self,
        sequences: Sequences,
    ) -> Result<HashMap<SequenceId, SequenceEntry<Index>>, Error>
    where
        Sequences: Iterator<Item = SequenceId> + Clone,
    {
        catch_compaction_and_retry(|| {
            let mut tree = TreeFile::<VersionedTreeRoot<Index>, File>::read(
                &self.path,
                self.state.clone(),
                self.roots.context(),
                Some(self.roots.transactions()),
            )?;

            
            tree.get_multiple_with_indexes_by_sequence(sequences.clone(), false)
        })
    }
}

            
/// An error that could come from user code or Nebari.
#[derive(thiserror::Error, Debug)]
pub enum AbortError<CallerError: Display + Debug = Infallible> {
    /// An error unrelated to Nebari occurred.
    #[error("other error: {0}")]
    Other(CallerError),
    /// An error from Roots occurred.
    #[error("database error: {0}")]
    Nebari(#[from] Error),
}

            
impl AbortError<Infallible> {
    /// Unwraps the error contained within an infallible abort error.
    #[must_use]
    pub fn infallible(self) -> Error {
        match self {
            Self::Other(infallible) => match infallible {},
            Self::Nebari(error) => error,
        }
    }
}

            
/// A thread pool that commits transactions to disk in parallel.
#[derive(Debug)]
pub struct ThreadPool<File>
where
    File: ManagedFile,
{
    sender: flume::Sender<ThreadCommit<File>>,
    receiver: flume::Receiver<ThreadCommit<File>>,
    thread_count: Arc<AtomicU16>,
    maximum_threads: usize,
}

            
impl<File: ManagedFile> ThreadPool<File> {
    /// Returns a thread pool that will spawn up to `maximum_threads` to process
    /// file operations.
    #[must_use]
17
    pub fn new(maximum_threads: usize) -> Self {
17
        let (sender, receiver) = flume::unbounded();
17
        Self {
17
            sender,
17
            receiver,
17
            thread_count: Arc::new(AtomicU16::new(0)),
17
            maximum_threads,
17
        }
17
    }

            
24592
    fn commit_trees(
24592
        &self,
24592
        trees: Vec<UnlockedTransactionTree<File>>,
24592
    ) -> Result<Vec<Box<dyn AnyTransactionTree<File>>>, Error> {
24592
        // If we only have one tree, there's no reason to split IO across
24592
        // threads. If we have multiple trees, we should split even with one
24592
        // cpu: if one thread blocks, the other can continue executing.
24592
        if trees.len() == 1 {
24592
            let mut tree = trees.into_iter().next().unwrap().0.into_inner();
24592
            tree.commit()?;
24592
            Ok(vec![tree])
        } else {
            // Push the trees so that any existing threads can begin processing the queue.
            let (completion_sender, completion_receiver) = flume::unbounded();
            let tree_count = trees.len();
            for tree in trees {
                self.sender.send(ThreadCommit {
                    tree: tree.0.into_inner(),
                    completion_sender: completion_sender.clone(),
                })?;
            }

            
            // Scale the queue if needed.
            let desired_threads = tree_count.min(self.maximum_threads);
            loop {
                let thread_count = self.thread_count.load(Ordering::SeqCst);
                if (thread_count as usize) >= desired_threads {
                    break;
                }

            
                // Spawn a thread, but ensure that we don't spin up too many threads if another thread is committing at the same time.
                if self
                    .thread_count
                    .compare_exchange(
                        thread_count,
                        thread_count + 1,
                        Ordering::SeqCst,
                        Ordering::SeqCst,
                    )
                    .is_ok()
                {
                    let commit_receiver = self.receiver.clone();
                    std::thread::Builder::new()
                        .name(String::from("roots-txwriter"))
                        .spawn(move || transaction_commit_thread(commit_receiver))
                        .unwrap();
                }
            }

            
            // Wait for our results
            let mut results = Vec::with_capacity(tree_count);
            for _ in 0..tree_count {
                results.push(completion_receiver.recv()??);
            }

            
            Ok(results)
        }
24592
    }
}

            
impl<File: ManagedFile> Clone for ThreadPool<File> {
    fn clone(&self) -> Self {
        Self {
            sender: self.sender.clone(),
            receiver: self.receiver.clone(),
            thread_count: self.thread_count.clone(),
            maximum_threads: self.maximum_threads,
        }
    }
}

            
impl<File: ManagedFile> Default for ThreadPool<File> {
17
    fn default() -> Self {
17
        static CPU_COUNT: Lazy<usize> = Lazy::new(num_cpus::get);
17
        Self::new(*CPU_COUNT)
17
    }
}

            
#[allow(clippy::needless_pass_by_value)]
fn transaction_commit_thread<File: ManagedFile>(receiver: flume::Receiver<ThreadCommit<File>>) {
    while let Ok(ThreadCommit {
        mut tree,
        completion_sender,
    }) = receiver.recv()
    {
        let result = tree.commit();
        let result = result.map(move |_| tree);
        drop(completion_sender.send(result));
    }
}

            
struct ThreadCommit<File>
where
    File: ManagedFile,
{
    tree: Box<dyn AnyTransactionTree<File>>,
    completion_sender: Sender<Result<Box<dyn AnyTransactionTree<File>>, Error>>,
}

            
8202
fn catch_compaction_and_retry<R, F: Fn() -> Result<R, Error>>(func: F) -> Result<R, Error> {
8202
    loop {
8202
        match func() {
8200
            Ok(result) => return Ok(result),
2
            Err(error) => {
2
                if matches!(error.kind, ErrorKind::TreeCompacted) {
                    continue;
2
                }
2

            
2
                return Err(error);
            }
        }
    }
8202
}

            
fn catch_compaction_and_retry_abortable<
    R,
    E: Display + Debug,
    F: FnMut() -> Result<R, AbortError<E>>,
>(
    mut func: F,
) -> Result<R, AbortError<E>> {
    loop {
        match func() {
            Ok(result) => return Ok(result),
            Err(AbortError::Nebari(error)) => {
                if matches!(error.kind, ErrorKind::TreeCompacted) {
                    continue;
                }

            
                return Err(AbortError::Nebari(error));
            }
            Err(other) => return Err(other),
        }
    }
}

            
#[cfg(test)]
mod tests {
    use byteorder::{BigEndian, ByteOrder};
    use tempfile::tempdir;

            
    use super::*;
    use crate::{
        io::{any::AnyFileManager, fs::StdFileManager, memory::MemoryFileManager},
        test_util::RotatorVault,
        tree::{Root, Unversioned, ValueIndex, Versioned},
    };

            
2
    fn basic_get_set<M: FileManager>(file_manager: M) {
2
        let tempdir = tempdir().unwrap();
2
        let roots = Config::new(tempdir.path())
2
            .file_manager(file_manager)
2
            .open()
2
            .unwrap();
2

            
2
        let tree = roots.tree(Versioned::tree("test")).unwrap();
2
        tree.set(b"test", b"value").unwrap();
2
        let result = tree.get(b"test").unwrap().expect("key not found");
2

            
2
        assert_eq!(result, b"value");
2
    }

            
1
    #[test]
1
    fn memory_basic_get_set() {
1
        basic_get_set(MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn std_basic_get_set() {
1
        basic_get_set(StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn basic_transaction_isolation_test() {
1
        let tempdir = tempdir().unwrap();
1

            
1
        let roots = Config::<StdFileManager>::new(tempdir.path())
1
            .open()
1
            .unwrap();
1
        let tree = roots.tree(Versioned::tree("test")).unwrap();
1
        tree.set(b"test", b"value").unwrap();
1

            
1
        // Begin a transaction
1
        let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1

            
1
        // Replace the key with a new value.
1
        transaction
1
            .tree::<Versioned>(0)
1
            .unwrap()
1
            .set(b"test", b"updated value")
1
            .unwrap();
1

            
1
        // Check that the transaction can read the new value
1
        let result = transaction
1
            .tree::<Versioned>(0)
1
            .unwrap()
1
            .get(b"test")
1
            .unwrap()
1
            .expect("key not found");
1
        assert_eq!(result, b"updated value");

            
        // Ensure that existing read-access doesn't see the new value
1
        let result = tree.get(b"test").unwrap().expect("key not found");
1
        assert_eq!(result, b"value");

            
        // Commit the transaction
1
        transaction.commit().unwrap();
1

            
1
        // Ensure that the reader now sees the new value
1
        let result = tree.get(b"test").unwrap().expect("key not found");
1
        assert_eq!(result, b"updated value");
1
    }

            
1
    #[test]
1
    fn basic_transaction_rollback_test() {
1
        let tempdir = tempdir().unwrap();
1

            
1
        let roots = Config::<StdFileManager>::new(tempdir.path())
1
            .open()
1
            .unwrap();
1
        let tree = roots.tree(Versioned::tree("test")).unwrap();
1
        tree.set(b"test", b"value").unwrap();
1

            
1
        // Begin a transaction
1
        let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1

            
1
        // Replace the key with a new value.
1
        transaction
1
            .tree::<Versioned>(0)
1
            .unwrap()
1
            .set(b"test", b"updated value")
1
            .unwrap();
1

            
1
        // Roll the transaction back
1
        transaction.rollback();
1

            
1
        // Ensure that the reader still sees the old value
1
        let result = tree.get(b"test").unwrap().expect("key not found");
1
        assert_eq!(result, b"value");

            
        // Begin a new transaction
1
        let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1
        // Check that the transaction has the original value
1
        let result = transaction
1
            .tree::<Versioned>(0)
1
            .unwrap()
1
            .get(b"test")
1
            .unwrap()
1
            .expect("key not found");
1
        assert_eq!(result, b"value");
1
    }

            
1
    #[test]
1
    fn std_compact_test_versioned() {
1
        compact_test::<Versioned, _>(StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn std_compact_test_unversioned() {
1
        compact_test::<Unversioned, _>(StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn memory_compact_test_versioned() {
1
        compact_test::<Versioned, _>(MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn memory_compact_test_unversioned() {
1
        compact_test::<Unversioned, _>(MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn any_compact_test_versioned() {
1
        compact_test::<Versioned, _>(AnyFileManager::std());
1
        compact_test::<Versioned, _>(AnyFileManager::memory());
1
    }

            
1
    #[test]
1
    fn any_compact_test_unversioned() {
1
        compact_test::<Unversioned, _>(AnyFileManager::std());
1
        compact_test::<Unversioned, _>(AnyFileManager::memory());
1
    }

            
8
    fn compact_test<R: Root<Value = ArcBytes<'static>>, M: FileManager>(file_manager: M)
8
    where
8
        R::Reducer: Default,
8
    {
8
        const OPERATION_COUNT: usize = 256;
8
        const WORKER_COUNT: usize = 4;
8
        let tempdir = tempdir().unwrap();
8

            
8
        let roots = Config::new(tempdir.path())
8
            .file_manager(file_manager)
8
            .open()
8
            .unwrap();
8
        let tree = roots.tree(R::tree("test")).unwrap();
8
        tree.set("foo", b"bar").unwrap();
8

            
8
        // Spawn a pool of threads that will perform a series of operations
8
        let mut threads = Vec::new();
40
        for worker in 0..WORKER_COUNT {
32
            let tree = tree.clone();
32
            threads.push(std::thread::spawn(move || {
8224
                for relative_id in 0..OPERATION_COUNT {
8192
                    let absolute_id = (worker * OPERATION_COUNT + relative_id) as u64;
8192
                    tree.set(absolute_id.to_be_bytes(), absolute_id.to_be_bytes())
8192
                        .unwrap();
8192
                    let ValueIndex { value, .. } = tree
8192
                        .remove(&absolute_id.to_be_bytes())
8192
                        .unwrap()
8192
                        .ok_or_else(|| panic!("value not found: {:?}", absolute_id))
8192
                        .unwrap();
8192
                    assert_eq!(BigEndian::read_u64(&value), absolute_id);
8192
                    tree.set(absolute_id.to_be_bytes(), absolute_id.to_be_bytes())
8192
                        .unwrap();
8192
                    let newer_value = tree
8192
                        .get(&absolute_id.to_be_bytes())
8192
                        .unwrap()
8192
                        .expect("couldn't find found");
8192
                    assert_eq!(value, newer_value);
                }
32
            }));
32
        }

            
8
        threads.push(std::thread::spawn(move || {
            // While those workers are running, this thread is going to continually
            // execute compaction.
3506
            while tree.count() < (OPERATION_COUNT * WORKER_COUNT) as u64 {
3498
                tree.compact().unwrap();
3498
            }
8
        }));

            
48
        for thread in threads {
40
            thread.join().unwrap();
40
        }
8
    }

            
1
    #[test]
1
    fn name_tests() {
1
        assert!(check_name("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-.").is_ok());
1
        assert!(check_name("=").is_err());
1
        assert!(check_name("_transactions").is_err());
1
    }

            
1
    #[test]
1
    fn context_encryption_tests() {
1
        let tempdir = tempdir().unwrap();
1

            
1
        // Encrypt a tree using default encryption via the context
1
        {
1
            let roots = Config::<StdFileManager>::new(tempdir.path())
1
                .vault(RotatorVault::new(13))
1
                .open()
1
                .unwrap();
1
            let tree = roots.tree(Versioned::tree("test")).unwrap();
1
            tree.set(b"test", b"value").unwrap();
1
            let other_tree = roots
1
                .tree(Versioned::tree("test-otherkey").with_vault(RotatorVault::new(42)))
1
                .unwrap();
1
            other_tree.set(b"test", b"other").unwrap();
1
        }
1
        // Try to access the tree with the vault again.
1
        {
1
            let roots = Config::<StdFileManager>::new(tempdir.path())
1
                .vault(RotatorVault::new(13))
1
                .open()
1
                .unwrap();
1
            let tree = roots.tree(Versioned::tree("test")).unwrap();
1
            let value = tree.get(b"test").unwrap();
1
            assert_eq!(value.as_deref(), Some(&b"value"[..]));

            
            // Verify we can't read the other tree without the right vault
1
            let bad_tree = roots.tree(Versioned::tree("test-otherkey")).unwrap();
1
            assert!(bad_tree.get(b"test").is_err());

            
            // And test retrieving the other key with the correct vault
1
            let tree = roots
1
                .tree(Versioned::tree("test-otherkey").with_vault(RotatorVault::new(42)))
1
                .unwrap();
1
            let value = tree.get(b"test").unwrap();
1
            assert_eq!(value.as_deref(), Some(&b"other"[..]));
        }
        {
1
            let roots = Config::<StdFileManager>::new(tempdir.path())
1
                .open()
1
                .unwrap();
1
            // Try to access roots without the vault.
1
            let bad_tree = roots.tree(Versioned::tree("test")).unwrap();
1
            assert!(bad_tree.get(b"test").is_err());

            
            // Try to access roots with the vault specified. In this situation, the transaction log will be unreadable, causing itself to not consider any transactions valid.
1
            let bad_tree = roots
1
                .tree(Versioned::tree("test").with_vault(RotatorVault::new(13)))
1
                .unwrap();
1
            assert_eq!(bad_tree.get(b"test").unwrap(), None);
        }
1
    }

            
1
    #[test]
1
    fn too_large_transaction() {
1
        let tempdir = tempdir().unwrap();
1

            
1
        let config = Config::<StdFileManager>::new(tempdir.path());
1
        {
1
            let roots = config.clone().open().unwrap();
1

            
1
            let mut transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1

            
1
            // Write some data to the tree.
1
            transaction
1
                .tree::<Versioned>(0)
1
                .unwrap()
1
                .set(b"test", vec![0; 16 * 1024 * 1024])
1
                .unwrap();

            
            // Issue a transaction that's too large.
1
            assert!(matches!(
1
                transaction
1
                    .entry_mut()
1
                    .set_data(vec![0; 16 * 1024 * 1024 - 7])
1
                    .unwrap_err()
1
                    .kind,
                ErrorKind::ValueTooLarge
            ));
            // Roll the transaction back
1
            transaction.rollback();
1
        }
1
        // Ensure that we can still write to the tree.
1
        {
1
            let roots = config.open().unwrap();
1

            
1
            let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1

            
1
            // Write some data to the tree
1
            transaction
1
                .tree::<Versioned>(0)
1
                .unwrap()
1
                .set(b"test", b"updated value")
1
                .unwrap();
1

            
1
            transaction.commit().unwrap();
1
        }
1
    }
}