1
//! Append-only B-Tree implementation
2
//!
3
//! The file format is loosely inspired by
4
//! [Couchstore](https://github.com/couchbase/couchstore). Nebari is not
5
//! compatible with Couchstore in any way.
6
//!
7
//! ## Numbers and Alignment
8
//!
9
//! - All numbers are encoded in big-endian format/network byte order.
10
//! - All values are tightly packed. There is no padding or alignment that isn't
11
//!   explicitly included.
12
//!
13
//! ## File Organization
14
//!
15
//! There is no way to read this file format starting at byte 0 and iterating
16
//! forward. The contents of any given byte offset are unknown until the file's
17
//! current root header has been found.
18
//!
19
//! When writing data to the file, it will be appended to the end of the file.
20
//! When a tree is committed, all of the changed nodes will be appended to the
21
//! end of the file, except for the Root.
22
//!
23
//! Before writing the Root, the file is padded to a multiple of
24
//! [`PAGE_SIZE`]. A 3-byte magic code is written, followed by a byte for the
25
//! [`PageHeader`].
26
//!
27
//! The Root is then serialized and written as a chunk.
28
//!
29
//! To locate the most recent header, take the file's length and find the
30
//! largest multiple of [`PAGE_SIZE`]. Check the first three bytes at that
31
//! offset for the magic code. If found, attempt to read a chunk. If successful,
32
//! attempt to deserialize the Root.
33
//!
34
//! If any step fails, loop back through the file at each [`PAGE_SIZE`] offset
35
//! until a valid header is found.
36
//!
37
//! ## Chunks
38
//!
39
//! Each time a value, B-Tree node, or header is written, it is written as a
40
//! chunk. If a [`Vault`](crate::Vault) is in-use, each chunk will be
41
//! pre-processed by the vault before a `CRC-32-BZIP2` checksum is calculated. A
42
//! chunk is limited to 4 gigabytes of data (2^32).
43
//!
44
//! The chunk is written as:
45
//!
46
//! - `u32` - Data length, excluding the header.
47
//! - `u32` - CRC
48
//! - `[u8]` - Contents
49

            
50
use std::{
51
    borrow::Cow,
52
    cell::RefCell,
53
    collections::HashMap,
54
    fmt::{Debug, Display},
55
    hash::BuildHasher,
56
    io::SeekFrom,
57
    marker::PhantomData,
58
    ops::{Bound, Deref, DerefMut, Range, RangeBounds},
59
    sync::Arc,
60
};
61

            
62
use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
63
use crc::{Crc, CRC_32_BZIP2};
64
use parking_lot::MutexGuard;
65

            
66
use crate::{
67
    chunk_cache::CacheEntry,
68
    error::Error,
69
    io::{
70
        File, FileManager, FileOp, IntoPathId, ManagedFile, ManagedFileOpener, OpenableFile,
71
        OperableFile, PathId,
72
    },
73
    roots::AbortError,
74
    transaction::{ManagedTransaction, TransactionManager},
75
    tree::btree::{BTreeNode, Indexer, KeyOperation, Reducer, ScanArgs},
76
    vault::AnyVault,
77
    ArcBytes, ChunkCache, CompareAndSwapError, Context, ErrorKind,
78
};
79

            
80
/// B+Tree types
81
pub mod btree;
82
mod by_id;
83
mod by_sequence;
84
mod interior;
85
mod key_entry;
86
mod modify;
87
pub(crate) mod root;
88
mod serialization;
89
pub(crate) mod state;
90
mod unversioned;
91
mod versioned;
92

            
93
pub(crate) const DEFAULT_MAX_ORDER: usize = 1000;
94

            
95
pub use self::{
96
    by_id::{ByIdIndexer, ByIdStats, UnversionedByIdIndex, VersionedByIdIndex},
97
    by_sequence::{BySequenceIndex, BySequenceStats, SequenceId},
98
    interior::{Interior, Pointer},
99
    key_entry::{KeyEntry, PositionIndex},
100
    modify::{CompareSwap, CompareSwapFn, Modification, Operation, PersistenceMode},
101
    root::{AnyTreeRoot, Root, TreeRoot},
102
    serialization::BinarySerialization,
103
    state::{ActiveState, State},
104
    unversioned::{Unversioned, UnversionedTreeRoot},
105
    versioned::{KeySequence, SequenceEntry, SequenceIndex, Versioned, VersionedTreeRoot},
106
};
107

            
108
/// The number of bytes in each page on-disk.
109
// The memory used by PagedWriter is PAGE_SIZE * PAGED_WRITER_BATCH_COUNT. E.g,
110
// 4096 * 4 = 16kb
111
pub const PAGE_SIZE: usize = 256;
112

            
113
const CRC32: Crc<u32> = Crc::<u32>::new(&CRC_32_BZIP2);
114

            
115
/// The header byte for a tree file's page.
116
10096
#[derive(Eq, PartialEq)]
117
pub enum PageHeader {
118
    /// A [`VersionedTreeRoot`] header.
119
    VersionedHeader = 2,
120
    /// An [`UnversionedTreeRoot`] header.
121
    UnversionedHeader = 3,
122
}
123

            
124
impl TryFrom<u8> for PageHeader {
125
    type Error = ErrorKind;
126

            
127
75662
    fn try_from(value: u8) -> Result<Self, Self::Error> {
128
75662
        match value {
129
6053
            2 => Ok(Self::VersionedHeader),
130
4043
            3 => Ok(Self::UnversionedHeader),
131
65566
            _ => Err(ErrorKind::data_integrity(format!(
132
65566
                "invalid block header: {}",
133
65566
                value
134
65566
            ))),
135
        }
136
75662
    }
137
}
138

            
139
/// An append-only tree file.
140
///
141
/// ## Generics
142
/// - `File`: An [`ManagedFile`] implementor.
143
#[derive(Debug)]
144
pub struct TreeFile<Root: root::Root, File: ManagedFile> {
145
    /// The file handle the tree is stored within.
146
    pub file: <File::Manager as FileManager>::FileHandle,
147
    /// The state of the file.
148
    pub state: State<Root>,
149
    /// The vault used to encrypt/decrypt chunks.
150
    pub vault: Option<Arc<dyn AnyVault>>,
151
    /// The cache used to cache chunks from the file.
152
    pub cache: Option<ChunkCache>,
153
    scratch: Vec<u8>,
154
}
155

            
156
impl<Root: root::Root, File: ManagedFile> Deref for TreeFile<Root, File> {
157
    type Target = <File::Manager as FileManager>::FileHandle;
158

            
159
    fn deref(&self) -> &Self::Target {
160
        &self.file
161
    }
162
}
163

            
164
impl<Root: root::Root, File: ManagedFile> DerefMut for TreeFile<Root, File> {
165
    fn deref_mut(&mut self) -> &mut Self::Target {
166
        &mut self.file
167
    }
168
}
169

            
170
impl<Root: root::Root, File: ManagedFile> TreeFile<Root, File> {
171
    /// Returns a tree as contained in `file`.
172
    ///
173
    /// `state` should already be initialized using [`Self::initialize_state`] if the file exists.
174
46427
    pub fn new(
175
46427
        file: <File::Manager as FileManager>::FileHandle,
176
46427
        state: State<Root>,
177
46427
        vault: Option<Arc<dyn AnyVault>>,
178
46427
        cache: Option<ChunkCache>,
179
46427
    ) -> Result<Self, Error> {
180
46427
        Ok(Self {
181
46427
            file,
182
46427
            state,
183
46427
            vault,
184
46427
            cache,
185
46427
            scratch: Vec::new(),
186
46427
        })
187
46427
    }
188

            
189
    /// Opens a tree file with read-only permissions.
190
11700
    pub fn read(
191
11700
        path: impl IntoPathId,
192
11700
        state: State<Root>,
193
11700
        context: &Context<File::Manager>,
194
11700
        transactions: Option<&TransactionManager<File::Manager>>,
195
11700
    ) -> Result<Self, Error> {
196
11700
        let file = context.file_manager.read(path)?;
197
11700
        Self::initialize_state(&state, file.id(), context, transactions)?;
198
11698
        Self::new(file, state, context.vault.clone(), context.cache.clone())
199
11700
    }
200

            
201
    /// Opens a tree file with the ability to read and write.
202
30730
    pub fn write(
203
30730
        path: impl IntoPathId,
204
30730
        state: State<Root>,
205
30730
        context: &Context<File::Manager>,
206
30730
        transactions: Option<&TransactionManager<File::Manager>>,
207
30730
    ) -> Result<Self, Error> {
208
30730
        let file = context.file_manager.append(path)?;
209
30730
        Self::initialize_state(&state, file.id(), context, transactions)?;
210
30729
        Self::new(file, state, context.vault.clone(), context.cache.clone())
211
30730
    }
212

            
213
    /// Attempts to load the last saved state of this tree into `state`.
214
46430
    pub fn initialize_state(
215
46430
        state: &State<Root>,
216
46430
        file_path: &PathId,
217
46430
        context: &Context<File::Manager>,
218
46430
        transaction_manager: Option<&TransactionManager<File::Manager>>,
219
46430
    ) -> Result<(), Error> {
220
46430
        {
221
46430
            let read_state = state.read();
222
46430
            if read_state.initialized() {
223
36274
                return Ok(());
224
10156
            }
225
10156
        }
226
10156

            
227
10156
        let mut active_state = state.lock();
228
10156
        if active_state.initialized() {
229
            return Ok(());
230
10156
        }
231
10156

            
232
10156
        active_state.file_id = file_path.id();
233
10156
        let file_length = context.file_manager.file_length(file_path)?;
234
10156
        if file_length == 0 {
235
59
            active_state.root.initialize_default();
236
59
            active_state.publish(state);
237
59
            return Ok(());
238
10097
        }
239

            
240
10097
        let mut tree = context.file_manager.open_for_read(file_path)?;
241

            
242
        // Scan back block by block until we find a header page.
243
10097
        let mut block_start = file_length - (file_length % PAGE_SIZE as u64);
244
10097
        if file_length - block_start < 4 {
245
            // We need room for at least the 4-byte page header
246
            block_start -= PAGE_SIZE as u64;
247
10097
        }
248
10097
        let mut scratch_buffer = vec![0_u8; 4];
249
        loop {
250
            // Read the page header
251
75662
            tree.seek(SeekFrom::Start(block_start))?;
252
75662
            tree.read_exact(&mut scratch_buffer)?;
253

            
254
            #[allow(clippy::match_on_vec_items)]
255
            match (
256
75662
                &scratch_buffer[0..3],
257
75662
                PageHeader::try_from(scratch_buffer[3]),
258
75662
            ) {
259
75662
                (b"Nbr", Ok(header)) => {
260
10096
                    if header != Root::HEADER {
261
1
                        return Err(Error::data_integrity(format!(
262
1
                            "Tree {:?} contained another header type",
263
1
                            file_path
264
1
                        )));
265
10095
                    }
266
10095
                    let contents = match read_chunk(
267
10095
                        block_start + 4,
268
10095
                        true,
269
10095
                        &mut tree,
270
10095
                        context.vault(),
271
10095
                        context.cache(),
272
10095
                    )? {
273
10095
                        CacheEntry::ArcBytes(buffer) => buffer,
274
                        CacheEntry::Decoded(_) => unreachable!(),
275
                    };
276
10095
                    let root = Root::deserialize(contents, active_state.root.reducer().clone())
277
10095
                        .map_err(|err| ErrorKind::DataIntegrity(Box::new(err)))?;
278
10093
                    if let Some(transaction_manager) = transaction_manager {
279
3
                        if root.transaction_id().valid()
280
3
                            && !transaction_manager
281
3
                                .transaction_was_successful(root.transaction_id())?
282
                        {
283
                            // The transaction wasn't written successfully, so
284
                            // we cannot trust the data present.
285
1
                            if block_start == 0 {
286
                                // No data was ever fully written.
287
                                active_state.root.initialize_default();
288
                                return Ok(());
289
1
                            }
290
1
                            block_start -= PAGE_SIZE as u64;
291
1
                            continue;
292
2
                        }
293
10090
                    }
294
10092
                    active_state.root = root;
295
10092
                    break;
296
                }
297
                (_, Ok(_) | Err(_)) => {
298
65566
                    if block_start == 0 {
299
2
                        eprintln!(
300
2
                            "Tree {:?} contained data, but no valid pages were found",
301
2
                            file_path
302
2
                        );
303
2
                        active_state.root.initialize_default();
304
2
                        break;
305
65564
                    }
306
65564
                    block_start -= PAGE_SIZE as u64;
307
65564
                    continue;
308
                }
309
            }
310
        }
311

            
312
10094
        active_state.current_position = file_length;
313
10094
        active_state.publish(state);
314
10094
        Ok(())
315
46430
    }
316

            
317
    /// Sets a key/value pair. Replaces any previous value if set. If you wish
318
    /// to retrieve the previously stored value, use
319
    /// [`replace()`](Self::replace) instead.
320
    ///
321
    /// Returns the new/updated index for this key.
322
21806
    pub fn set(
323
21806
        &mut self,
324
21806
        persistence_mode: impl Into<PersistenceMode>,
325
21806
        key: impl Into<ArcBytes<'static>>,
326
21806
        value: impl Into<Root::Value>,
327
21806
    ) -> Result<Root::Index, Error> {
328
21806
        Ok(self
329
21806
            .file
330
21806
            .execute(TreeModifier {
331
21806
                state: &self.state,
332
21806
                vault: self.vault.as_deref(),
333
21806
                cache: self.cache.as_ref(),
334
21806
                modification: Some(Modification {
335
21806
                    persistence_mode: persistence_mode.into(),
336
21806
                    keys: vec![key.into()],
337
21806
                    operation: Operation::Set(value.into()),
338
21806
                }),
339
21806
                scratch: &mut self.scratch,
340
21806
            })?
341
21806
            .into_iter()
342
21806
            .next()
343
21806
            .expect("always produces a single result")
344
21806
            .index
345
21806
            .expect("modification always produces a new index"))
346
21806
    }
347

            
348
    /// Executes a modification. Returns a list of modified keys and their
349
    /// updated indexes, if the keys are still present.
350
18260
    pub fn modify(
351
18260
        &mut self,
352
18260
        modification: Modification<'_, Root::Value, Root::Index>,
353
18260
    ) -> Result<Vec<ModificationResult<Root::Index>>, Error> {
354
18260
        self.file.execute(TreeModifier {
355
18260
            state: &self.state,
356
18260
            vault: self.vault.as_deref(),
357
18260
            cache: self.cache.as_ref(),
358
18260
            modification: Some(modification),
359
18260
            scratch: &mut self.scratch,
360
18260
        })
361
18260
    }
362

            
363
    /// Compares the value of `key` against `old`. If the values match, key will
364
    /// be set to the new value if `new` is `Some` or removed if `new` is
365
    /// `None`.
366
40
    pub fn compare_and_swap<Old>(
367
40
        &mut self,
368
40
        key: &[u8],
369
40
        old: Option<&Old>,
370
40
        mut new: Option<Root::Value>,
371
40
        persistence_mode: impl Into<PersistenceMode>,
372
40
    ) -> Result<(), CompareAndSwapError<Root::Value>>
373
40
    where
374
40
        Old: PartialEq + ?Sized,
375
40
        Root::Value: AsRef<Old> + Clone,
376
40
    {
377
40
        let mut result = Ok(());
378
40
        self.modify(Modification {
379
40
            persistence_mode: persistence_mode.into(),
380
40
            keys: vec![ArcBytes::from(key)],
381
40
            operation: Operation::CompareSwap(CompareSwap::new(&mut |_key,
382
                                                                     _index,
383
                                                                     value: Option<
384
                Root::Value,
385
40
            >| {
386
40
                if old == value.as_ref().map(AsRef::as_ref) {
387
24
                    match new.take() {
388
16
                        Some(new) => KeyOperation::Set(new),
389
8
                        None => KeyOperation::Remove,
390
                    }
391
                } else {
392
16
                    result = Err(CompareAndSwapError::Conflict(value));
393
16
                    KeyOperation::Skip
394
                }
395
40
            })),
396
40
        })?;
397
40
        result
398
40
    }
399

            
400
    /// Removes `key` and returns the existing value and index, if present.
401
8192
    pub fn remove(
402
8192
        &mut self,
403
8192
        key: &[u8],
404
8192
        persistence_mode: impl Into<PersistenceMode>,
405
8192
    ) -> Result<Option<TreeValueIndex<Root>>, Error> {
406
8192
        let mut existing_value = None;
407
8192
        self.modify(Modification {
408
8192
            persistence_mode: persistence_mode.into(),
409
8192
            keys: vec![ArcBytes::from(key)],
410
8192
            operation: Operation::CompareSwap(CompareSwap::new(
411
8192
                &mut |_key, index: Option<&Root::Index>, value| {
412
8192
                    existing_value = if let (Some(index), Some(value)) = (index, value) {
413
8192
                        Some(ValueIndex {
414
8192
                            value,
415
8192
                            index: index.clone(),
416
8192
                        })
417
                    } else {
418
                        None
419
                    };
420
8192
                    KeyOperation::Remove
421
8192
                },
422
8192
            )),
423
8192
        })?;
424
8192
        Ok(existing_value)
425
8192
    }
426

            
427
    /// Sets `key` to `value`. Returns a tuple containing two elements:
428
    ///
429
    /// - The previously stored value, if a value was already present.
430
    /// - The new/updated index for this key.
431
    #[allow(clippy::missing_panics_doc)]
432
8
    pub fn replace(
433
8
        &mut self,
434
8
        key: impl Into<ArcBytes<'static>>,
435
8
        value: impl Into<Root::Value>,
436
8
        persistence_mode: impl Into<PersistenceMode>,
437
8
    ) -> Result<(Option<Root::Value>, Root::Index), Error> {
438
8
        let mut existing_value = None;
439
8
        let mut value = Some(value.into());
440
8
        let result = self
441
8
            .modify(Modification {
442
8
                persistence_mode: persistence_mode.into(),
443
8
                keys: vec![key.into()],
444
8
                operation: Operation::CompareSwap(CompareSwap::new(
445
8
                    &mut |_, _index, stored_value| {
446
8
                        existing_value = stored_value;
447
8
                        KeyOperation::Set(value.take().unwrap())
448
8
                    },
449
8
                )),
450
8
            })?
451
8
            .into_iter()
452
8
            .next()
453
8
            .unwrap();
454
8

            
455
8
        Ok((existing_value, result.index.unwrap()))
456
8
    }
457

            
458
    /// Gets the value stored for `key`.
459
116686
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
460
    pub fn get(&mut self, key: &[u8], in_transaction: bool) -> Result<Option<Root::Value>, Error> {
461
        let mut buffer = None;
462
        self.file.execute(TreeGetter {
463
            from_transaction: in_transaction,
464
            state: &self.state,
465
            vault: self.vault.as_deref(),
466
            cache: self.cache.as_ref(),
467
            keys: KeyRange::new(std::iter::once(key)),
468
54334
            key_reader: |_key, value, _index| {
469
54334
                buffer = Some(value);
470
54334
                Ok(())
471
54334
            },
472
56338
            key_evaluator: |_, _| ScanEvaluation::ReadData,
473
        })?;
474
        Ok(buffer)
475
    }
476

            
477
    /// Gets the index stored for `key`.
478
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
479
    pub fn get_index(
480
        &mut self,
481
        key: &[u8],
482
        in_transaction: bool,
483
    ) -> Result<Option<Root::Index>, Error> {
484
        let mut found_index = None;
485
        self.file.execute(TreeGetter {
486
            from_transaction: in_transaction,
487
            state: &self.state,
488
            vault: self.vault.as_deref(),
489
            cache: self.cache.as_ref(),
490
            keys: KeyRange::new(std::iter::once(key)),
491
            key_reader: |_, _, _| unreachable!(),
492
            key_evaluator: |_key, index| {
493
                found_index = Some(index.clone());
494
                ScanEvaluation::Skip
495
            },
496
        })?;
497
        Ok(found_index)
498
    }
499

            
500
    /// Gets the value and index stored for `key`.
501
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
502
    pub fn get_with_index(
503
        &mut self,
504
        key: &[u8],
505
        in_transaction: bool,
506
    ) -> Result<Option<TreeValueIndex<Root>>, Error> {
507
        let mut buffer = None;
508
        let mut found_index = None;
509
        self.file.execute(TreeGetter {
510
            from_transaction: in_transaction,
511
            state: &self.state,
512
            vault: self.vault.as_deref(),
513
            cache: self.cache.as_ref(),
514
            keys: KeyRange::new(std::iter::once(key)),
515
            key_reader: |_key, value, index| {
516
                buffer = Some(value);
517
                found_index = Some(index);
518
                Ok(())
519
            },
520
            key_evaluator: |_, _| ScanEvaluation::ReadData,
521
        })?;
522
        if let (Some(value), Some(index)) = (buffer, found_index) {
523
            Ok(Some(ValueIndex { value, index }))
524
        } else {
525
            Ok(None)
526
        }
527
    }
528

            
529
    /// Gets the values stored in `keys`. Does not error if a key is missing.
530
    /// Returns key/value pairs in an unspecified order. Keys are required to be
531
    /// pre-sorted.
532
2
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, keys)))]
533
    pub fn get_multiple<'keys, KeysIntoIter, KeysIter>(
534
        &mut self,
535
        keys: KeysIntoIter,
536
        in_transaction: bool,
537
    ) -> Result<Vec<(ArcBytes<'static>, Root::Value)>, Error>
538
    where
539
        KeysIntoIter: IntoIterator<Item = &'keys [u8], IntoIter = KeysIter>,
540
        KeysIter: Iterator<Item = &'keys [u8]> + ExactSizeIterator,
541
    {
542
        let keys = keys.into_iter();
543
        let mut buffers = Vec::with_capacity(keys.len());
544
        self.file.execute(TreeGetter {
545
            from_transaction: in_transaction,
546
            state: &self.state,
547
            vault: self.vault.as_deref(),
548
            cache: self.cache.as_ref(),
549
            keys: KeyRange::new(keys),
550
81
            key_reader: |key, value, _| {
551
81
                buffers.push((key, value));
552
81
                Ok(())
553
81
            },
554
81
            key_evaluator: |_, _| ScanEvaluation::ReadData,
555
        })?;
556
        Ok(buffers)
557
    }
558

            
559
    /// Gets the indexes stored in `keys`. Does not error if a key is missing.
560
    /// Returns key/value pairs in an unspecified order. Keys are required to be
561
    /// pre-sorted.
562
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, keys)))]
563
    pub fn get_multiple_indexes<'keys, KeysIntoIter, KeysIter>(
564
        &mut self,
565
        keys: KeysIntoIter,
566
        in_transaction: bool,
567
    ) -> Result<Vec<(ArcBytes<'static>, Root::Index)>, Error>
568
    where
569
        KeysIntoIter: IntoIterator<Item = &'keys [u8], IntoIter = KeysIter>,
570
        KeysIter: Iterator<Item = &'keys [u8]> + ExactSizeIterator,
571
    {
572
        let keys = keys.into_iter();
573
        let mut buffers = Vec::with_capacity(keys.len());
574
        self.file.execute(TreeGetter {
575
            from_transaction: in_transaction,
576
            state: &self.state,
577
            vault: self.vault.as_deref(),
578
            cache: self.cache.as_ref(),
579
            keys: KeyRange::new(keys),
580
            key_reader: |key, _value, index| {
581
                buffers.push((key, index));
582
                Ok(())
583
            },
584
            key_evaluator: |_, _| ScanEvaluation::ReadData,
585
        })?;
586
        Ok(buffers)
587
    }
588

            
589
    /// Gets the values and indexes stored in `keys`. Does not error if a key is
590
    /// missing. Returns key/value pairs in an unspecified order. Keys are
591
    /// required to be pre-sorted.
592
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, keys)))]
593
    pub fn get_multiple_with_indexes<'keys, KeysIntoIter, KeysIter>(
594
        &mut self,
595
        keys: KeysIntoIter,
596
        in_transaction: bool,
597
    ) -> Result<Vec<TreeEntry<Root>>, Error>
598
    where
599
        KeysIntoIter: IntoIterator<Item = &'keys [u8], IntoIter = KeysIter>,
600
        KeysIter: Iterator<Item = &'keys [u8]> + ExactSizeIterator,
601
    {
602
        let keys = keys.into_iter();
603
        let mut buffers = Vec::with_capacity(keys.len());
604
        self.file.execute(TreeGetter {
605
            from_transaction: in_transaction,
606
            state: &self.state,
607
            vault: self.vault.as_deref(),
608
            cache: self.cache.as_ref(),
609
            keys: KeyRange::new(keys),
610
            key_reader: |key, value, index| {
611
                buffers.push(Entry { key, value, index });
612
                Ok(())
613
            },
614
            key_evaluator: |_, _| ScanEvaluation::ReadData,
615
        })?;
616
        Ok(buffers)
617
    }
618

            
619
    /// Retrieves all keys and values for keys that are contained by `range`.
620
10
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
621
    pub fn get_range<'keys, KeyRangeBounds>(
622
        &mut self,
623
        range: &'keys KeyRangeBounds,
624
        in_transaction: bool,
625
    ) -> Result<Vec<(ArcBytes<'static>, Root::Value)>, Error>
626
    where
627
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
628
    {
629
        let mut results = Vec::new();
630
        self.scan(
631
            range,
632
            true,
633
            in_transaction,
634
56
            |_, _, _| ScanEvaluation::ReadData,
635
164
            |_, _| ScanEvaluation::ReadData,
636
164
            |key, _index, value| {
637
164
                results.push((key, value));
638
164
                Ok(())
639
164
            },
640
        )?;
641
        Ok(results)
642
    }
643

            
644
    /// Retrieves all keys and indexes for keys that are contained by `range`.
645
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
646
    pub fn get_range_indexes<'keys, KeyRangeBounds>(
647
        &mut self,
648
        range: &'keys KeyRangeBounds,
649
        in_transaction: bool,
650
    ) -> Result<Vec<(ArcBytes<'static>, Root::Index)>, Error>
651
    where
652
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
653
    {
654
        let mut results = Vec::new();
655
        self.scan(
656
            range,
657
            true,
658
            in_transaction,
659
            |_, _, _| ScanEvaluation::ReadData,
660
            |key, index| {
661
                results.push((key.clone(), index.clone()));
662
                ScanEvaluation::Skip
663
            },
664
            |_key, _index, _value| unreachable!(),
665
        )?;
666
        Ok(results)
667
    }
668

            
669
    /// Retrieves all keys and values and indexes for keys that are contained by
670
    /// `range`.
671
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
672
    pub fn get_range_with_indexes<'keys, KeyRangeBounds>(
673
        &mut self,
674
        range: &'keys KeyRangeBounds,
675
        in_transaction: bool,
676
    ) -> Result<Vec<TreeEntry<Root>>, Error>
677
    where
678
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
679
    {
680
        let mut results = Vec::new();
681
        self.scan(
682
            range,
683
            true,
684
            in_transaction,
685
            |_, _, _| ScanEvaluation::ReadData,
686
            |_, _| ScanEvaluation::ReadData,
687
            |key, index, value| {
688
                results.push(Entry {
689
                    key,
690
                    value,
691
                    index: index.clone(),
692
                });
693
                Ok(())
694
            },
695
        )?;
696
        Ok(results)
697
    }
698

            
699
    /// Scans the tree across all nodes that might contain nodes within `range`.
700
    ///
701
    /// If `forwards` is true, the tree is scanned in ascending order.
702
    /// Otherwise, the tree is scanned in descending order.
703
    ///
704
    /// `node_evaluator` is invoked for each [`Interior`] node to determine if
705
    /// the node should be traversed. The parameters to the callback are:
706
    ///
707
    /// - `&ArcBytes<'static>`: The maximum key stored within the all children
708
    ///   nodes.
709
    /// - `&Root::ReducedIndex`: The reduced index value stored within the node.
710
    /// - `usize`: The depth of the node. The root nodes are depth 0.
711
    ///
712
    /// The result of the callback is a [`ScanEvaluation`]. To read children
713
    /// nodes, return [`ScanEvaluation::ReadData`].
714
    ///
715
    /// `key_evaluator` is invoked for each key encountered that is contained
716
    /// within `range`. For all [`ScanEvaluation::ReadData`] results returned,
717
    /// `callback` will be invoked with the key and values. `callback` may not
718
    /// be invoked in the same order as the keys are scanned.
719
    #[cfg_attr(
720
        feature = "tracing",
721
74
        tracing::instrument(skip(self, node_evaluator, key_evaluator, key_reader))
722
    )]
723
    pub fn scan<'keys, CallerError, KeyRangeBounds, NodeEvaluator, KeyEvaluator, DataCallback>(
724
        &mut self,
725
        range: &'keys KeyRangeBounds,
726
        forwards: bool,
727
        in_transaction: bool,
728
        node_evaluator: NodeEvaluator,
729
        key_evaluator: KeyEvaluator,
730
        key_reader: DataCallback,
731
    ) -> Result<(), AbortError<CallerError>>
732
    where
733
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
734
        NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation,
735
        KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
736
        DataCallback: FnMut(
737
            ArcBytes<'static>,
738
            &Root::Index,
739
            Root::Value,
740
        ) -> Result<(), AbortError<CallerError>>,
741
        CallerError: Display + Debug,
742
    {
743
        self.file.execute(TreeScanner {
744
            forwards,
745
            from_transaction: in_transaction,
746
            state: &self.state,
747
            vault: self.vault.as_deref(),
748
            cache: self.cache.as_ref(),
749
            range,
750
            node_evaluator,
751
            key_reader,
752
            key_evaluator,
753
            _phantom: PhantomData,
754
        })?;
755
        Ok(())
756
    }
757

            
758
    /// Returns the reduced index over the provided range. This is an
759
    /// aggregation function that builds atop the `scan()` operation which calls
760
    /// [`Reducer::reduce()`] and [`Reducer::rereduce()`] on all matching
761
    /// indexes stored within the nodes of this tree, producing a single
762
    /// aggregated [`Root::ReducedIndex`] value.
763
    ///
764
    /// If no keys match, the returned result is what [`Reducer::rereduce()`]
765
    /// returns when an empty slice is provided.
766
65288
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
767
    pub fn reduce<'keys, KeyRangeBounds>(
768
        &mut self,
769
        range: &'keys KeyRangeBounds,
770
        in_transaction: bool,
771
    ) -> Result<Option<Root::ReducedIndex>, Error>
772
    where
773
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
774
        Root::Index: Clone,
775
    {
776
        let reducer = {
777
            let state = self.state.lock();
778
            state.root.reducer().clone()
779
        };
780
        let reduce_state = RefCell::new(ReduceState::new(reducer));
781
        self.file.execute(TreeScanner {
782
            forwards: true,
783
            from_transaction: in_transaction,
784
            state: &self.state,
785
            vault: self.vault.as_deref(),
786
            cache: self.cache.as_ref(),
787
            range,
788
949660
            node_evaluator: |max_key, index, depth| {
789
949660
                let mut state = reduce_state.borrow_mut();
790
949660
                state.reduce_to_depth(depth);
791
949660
                let start_is_after_max = match range.start_bound() {
792
2
                    Bound::Unbounded => false,
793
79
                    Bound::Excluded(start) => start >= &max_key.as_slice(),
794
949579
                    Bound::Included(start) => start > &max_key.as_slice(),
795
                };
796
949660
                let start_is_lowest = match range.start_bound() {
797
2
                    Bound::Unbounded => true,
798
79
                    Bound::Excluded(start) => start < &state.lowest_key.as_slice(),
799
949579
                    Bound::Included(start) => start <= &state.lowest_key.as_slice(),
800
                };
801
949660
                let end_included = match range.end_bound() {
802
32
                    Bound::Included(end) => end <= &max_key.as_slice(),
803
949547
                    Bound::Excluded(end) => end < &max_key.as_slice(),
804
81
                    Bound::Unbounded => true,
805
                };
806
949660
                if start_is_after_max {
807
                    // We are beyond the end, we can stop scanning.
808
                    ScanEvaluation::Stop
809
949660
                } else if end_included && start_is_lowest {
810
                    // The node is fully included. Copy the index to the
811
                    // stack and skip all the children.
812
2
                    state.push_reduced(depth, index.clone());
813
2
                    ScanEvaluation::Skip
814
                } else {
815
                    // This node is partially contained.
816
949658
                    ScanEvaluation::ReadData
817
                }
818
949660
            },
819
2796770
            key_evaluator: |key, index| {
820
2796770
                if range.contains(&key.as_slice()) {
821
2796770
                    let mut state = reduce_state.borrow_mut();
822
2796770
                    state.push_index(index.clone());
823
2796770
                }
824
2796770
                ScanEvaluation::Skip
825
2796770
            },
826
            key_reader: |_, _, _| unreachable!(),
827
            _phantom: PhantomData,
828
        })?;
829
        let reduce_state = reduce_state.into_inner();
830
        Ok(reduce_state.finish())
831
    }
832

            
833
    /// Returns the first key of the tree.
834
8
    pub fn first_key(&mut self, in_transaction: bool) -> Result<Option<ArcBytes<'static>>, Error> {
835
8
        let mut result = None;
836
8
        self.scan(
837
8
            &(..),
838
8
            true,
839
8
            in_transaction,
840
8
            |_, _, _| ScanEvaluation::ReadData,
841
8
            |key, _index| {
842
8
                result = Some(key.clone());
843
8
                ScanEvaluation::Stop
844
8
            },
845
8
            |_key, _index, _value| Ok(()),
846
8
        )?;
847

            
848
8
        Ok(result)
849
8
    }
850

            
851
    /// Returns the first key and value of the tree.
852
8
    pub fn first(
853
8
        &mut self,
854
8
        in_transaction: bool,
855
8
    ) -> Result<Option<(ArcBytes<'static>, Root::Value)>, Error> {
856
8
        let mut result = None;
857
8
        let mut key_requested = false;
858
8
        self.scan(
859
8
            &(..),
860
8
            true,
861
8
            in_transaction,
862
8
            |_, _, _| ScanEvaluation::ReadData,
863
16
            |_, _| {
864
16
                if key_requested {
865
8
                    ScanEvaluation::Stop
866
                } else {
867
8
                    key_requested = true;
868
8
                    ScanEvaluation::ReadData
869
                }
870
16
            },
871
8
            |key, _index, value| {
872
8
                result = Some((key, value));
873
8
                Ok(())
874
8
            },
875
8
        )?;
876

            
877
8
        Ok(result)
878
8
    }
879

            
880
    /// Returns the last key of the tree.
881
8
    pub fn last_key(&mut self, in_transaction: bool) -> Result<Option<ArcBytes<'static>>, Error> {
882
8
        let mut result = None;
883
8
        self.scan(
884
8
            &(..),
885
8
            false,
886
8
            in_transaction,
887
8
            |_, _, _| ScanEvaluation::ReadData,
888
8
            |key, _index| {
889
8
                result = Some(key.clone());
890
8
                ScanEvaluation::Stop
891
8
            },
892
8
            |_key, _index, _value| Ok(()),
893
8
        )?;
894

            
895
8
        Ok(result)
896
8
    }
897

            
898
    /// Returns the last key and value of the tree.
899
8
    pub fn last(
900
8
        &mut self,
901
8
        in_transaction: bool,
902
8
    ) -> Result<Option<(ArcBytes<'static>, Root::Value)>, Error> {
903
8
        let mut result = None;
904
8
        let mut key_requested = false;
905
8
        self.scan(
906
8
            &(..),
907
8
            false,
908
8
            in_transaction,
909
8
            |_, _, _| ScanEvaluation::ReadData,
910
16
            |_, _| {
911
16
                if key_requested {
912
8
                    ScanEvaluation::Stop
913
                } else {
914
8
                    key_requested = true;
915
8
                    ScanEvaluation::ReadData
916
                }
917
16
            },
918
8
            |key, _index, value| {
919
8
                result = Some((key, value));
920
8
                Ok(())
921
8
            },
922
8
        )?;
923

            
924
8
        Ok(result)
925
8
    }
926

            
927
    /// Commits the tree. This is only needed if writes were done with a
928
    /// transaction id. This will fully flush the tree and publish the
929
    /// transactional state to be available to readers.
930
24592
    pub fn commit(&mut self) -> Result<(), Error> {
931
24592
        self.file.execute(TreeWriter {
932
24592
            state: &self.state,
933
24592
            vault: self.vault.as_deref(),
934
24592
            cache: self.cache.as_ref(),
935
24592
            scratch: &mut self.scratch,
936
24592
        })
937
24592
    }
938

            
939
    /// Rewrites the database, removing all unused data in the process. For a
940
    /// `VersionedTreeRoot`, this will remove old version information.
941
    ///
942
    /// This process is done atomically by creating a new file containing the
943
    /// active data. Once the new file has all the current file's data, the file
944
    /// contents are swapped using atomic file operations.
945
3506
    pub fn compact(
946
3506
        mut self,
947
3506
        file_manager: &File::Manager,
948
3506
        transactions: Option<TransactableCompaction<'_, File::Manager>>,
949
3506
    ) -> Result<Self, Error> {
950
3506
        let (compacted_file, finisher) = self.file.execute(TreeCompactor {
951
3506
            state: &self.state,
952
3506
            manager: file_manager,
953
3506
            vault: self.vault.as_deref(),
954
3506
            transactions,
955
3506
            scratch: &mut self.scratch,
956
3506
        })?;
957
3506
        self.file = self
958
3506
            .file
959
3506
            .replace_with(compacted_file, file_manager, |file_id| {
960
3506
                finisher.finish(file_id.id().expect("id can't be none at this stage"));
961
3506
            })?;
962
3506
        Ok(self)
963
3506
    }
964
}
965

            
966
#[derive(Debug)]
967
struct ReduceState<R, I, RI> {
968
    depths: Vec<DepthState<I, RI>>,
969
    lowest_key: ArcBytes<'static>,
970
    reducer: R,
971
}
972

            
973
impl<R, I, RI> ReduceState<R, I, RI>
974
where
975
    I: Clone,
976
    RI: Clone,
977
    R: Reducer<I, RI>,
978
{
979
32644
    fn new(reducer: R) -> Self {
980
32644
        Self {
981
32644
            depths: vec![DepthState::default()],
982
32644
            lowest_key: ArcBytes::default(),
983
32644
            reducer,
984
32644
        }
985
32644
    }
986

            
987
982304
    fn reduce_to_depth(&mut self, depth: usize) {
988
982304
        while self.depths.len() > depth + 1 {
989
            let state_to_reduce = self.depths.pop().unwrap();
990
            if let Some(reduced) = state_to_reduce.finish(&self.reducer) {
991
                self.depths.last_mut().unwrap().reduced.push(reduced);
992
            }
993
        }
994
982304
    }
995

            
996
2
    fn push_reduced(&mut self, depth: usize, reduced: RI) {
997
2
        if self.depths.len() < depth + 1 {
998
            self.depths.resize(depth + 1, DepthState::default());
999
2
        }
2
        self.depths[depth].reduced.push(reduced);
2
    }

            
2796770
    fn push_index(&mut self, index: I) {
2796770
        self.depths.last_mut().unwrap().indexes.push(index);
2796770
    }

            
32644
    fn finish(mut self) -> Option<RI> {
32644
        self.reduce_to_depth(0);
32644
        self.depths.pop().unwrap().finish(&self.reducer)
32644
    }
}

            
#[derive(Clone, Debug)]
struct DepthState<I, RI> {
    reduced: Vec<RI>,
    indexes: Vec<I>,
}

            
impl<I, RI> Default for DepthState<I, RI> {
32644
    fn default() -> Self {
32644
        Self {
32644
            reduced: Vec::default(),
32644
            indexes: Vec::default(),
32644
        }
32644
    }
}

            
impl<I, RI> DepthState<I, RI>
where
    I: Clone,
    RI: Clone,
{
32644
    fn finish<R>(mut self, reducer: &R) -> Option<RI>
32644
    where
32644
        R: Reducer<I, RI>,
32644
    {
32644
        if !self.indexes.is_empty() {
32643
            self.reduced.push(reducer.reduce(self.indexes.iter()));
32643
        }

            
32644
        (!self.reduced.is_empty()).then(|| reducer.rereduce(self.reduced.iter()))
32644
    }
}

            
impl<File: ManagedFile, Index> TreeFile<VersionedTreeRoot<Index>, File>
where
    Index: EmbeddedIndex<ArcBytes<'static>> + Clone + Debug + 'static,
{
    /// Scans the tree for keys that are contained within `range`. If `forwards`
    /// is true, scanning starts at the lowest sort-order key and scans forward.
    /// Otherwise, scanning starts at the highest sort-order key and scans
    /// backwards. `key_evaluator` is invoked for each key as it is encountered.
    /// For all [`ScanEvaluation::ReadData`] results returned, `callback` will be
    /// invoked with the key and values. The callback may not be invoked in the
    /// same order as the keys are scanned.
    #[cfg_attr(
        feature = "tracing",
2
        tracing::instrument(skip(self, key_evaluator, data_callback))
    )]
    pub fn scan_sequences<CallerError, Range, KeyEvaluator, DataCallback>(
        &mut self,
        range: Range,
        forwards: bool,
        in_transaction: bool,
        mut key_evaluator: KeyEvaluator,
        data_callback: DataCallback,
    ) -> Result<(), AbortError<CallerError>>
    where
        Range: RangeBounds<SequenceId> + Debug + 'static,
        KeyEvaluator: FnMut(KeySequence<Index>) -> ScanEvaluation,
        DataCallback:
            FnMut(KeySequence<Index>, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
        CallerError: Display + Debug,
    {
        self.file.execute(TreeSequenceScanner {
            forwards,
            from_transaction: in_transaction,
            state: &self.state,
            vault: self.vault.as_deref(),
            cache: self.cache.as_ref(),
            range: &U64Range::new(range).borrow_as_bytes(),
3
            key_evaluator: &mut move |key: &ArcBytes<'_>, index: &BySequenceIndex<Index>| {
3
                let id = SequenceId(BigEndian::read_u64(key));
3
                key_evaluator(KeySequence {
3
                    key: index.key.clone(),
3
                    sequence: id,
3
                    last_sequence: index.last_sequence,
3
                    embedded: index.embedded.clone(),
3
                })
3
            },
            data_callback,
        })?;
        Ok(())
    }

            
    /// Retrieves the keys and values associated with one or more `sequences`.
    /// The value retrieved is the value of the key at the given [`SequenceId`].
    /// If a sequence is not found, it will not appear in the result map. If
    /// the value was removed, None is returned for the value.
    pub fn get_multiple_by_sequence<Sequences>(
        &mut self,
        sequences: Sequences,
        in_transaction: bool,
    ) -> Result<HashMap<SequenceId, (ArcBytes<'static>, Option<ArcBytes<'static>>)>, Error>
    where
        Sequences: Iterator<Item = SequenceId>,
    {
        let results = RefCell::new(HashMap::new());
        self.file.execute(TreeSequenceGetter {
            keys: sequences,
            from_transaction: in_transaction,
            state: &self.state,
            vault: self.vault.as_deref(),
            cache: self.cache.as_ref(),
            key_evaluator: |sequence, index| {
                results
                    .borrow_mut()
                    .insert(sequence, (index.key.clone(), None));
                ScanEvaluation::ReadData
            },
            key_reader: |sequence, _index, value| {
                results
                    .borrow_mut()
                    .get_mut(&sequence)
                    .expect("reader can't be invoked without evaluator")
                    .1 = Some(value);
                Ok(())
            },
        })?;
        Ok(results.into_inner())
    }

            
    /// Retrieves the keys and indexes associated with one or more `sequences`.
    /// The value retrieved is the value of the key at the given [`SequenceId`].
    /// If a sequence is not found, it will not appear in the result list.
    pub fn get_multiple_indexes_by_sequence<Sequences>(
        &mut self,
        sequences: Sequences,
        in_transaction: bool,
    ) -> Result<Vec<SequenceIndex<Index>>, Error>
    where
        Sequences: Iterator<Item = SequenceId>,
    {
        let mut results = Vec::new();
        self.file.execute(TreeSequenceGetter {
            keys: sequences,
            from_transaction: in_transaction,
            state: &self.state,
            vault: self.vault.as_deref(),
            cache: self.cache.as_ref(),
            key_evaluator: |sequence, index| {
                results.push(SequenceIndex {
                    sequence,
                    index: index.clone(),
                });
                ScanEvaluation::Skip
            },
            key_reader: |_, _, _| unreachable!(),
        })?;
        Ok(results)
    }

            
    /// Retrieves the keys, values, and indexes associated with one or more
    /// `sequences`. The value retrieved is the value of the key at the given
    /// [`SequenceId`]. If a sequence is not found, it will not appear in the
    /// result list.
    pub fn get_multiple_with_indexes_by_sequence<Sequences>(
        &mut self,
        sequences: Sequences,
        in_transaction: bool,
    ) -> Result<HashMap<SequenceId, SequenceEntry<Index>>, Error>
    where
        Sequences: Iterator<Item = SequenceId>,
    {
        let results = RefCell::new(HashMap::new());
        self.file.execute(TreeSequenceGetter {
            keys: sequences,
            from_transaction: in_transaction,
            state: &self.state,
            vault: self.vault.as_deref(),
            cache: self.cache.as_ref(),
            key_evaluator: |sequence, index| {
                results.borrow_mut().insert(
                    sequence,
                    SequenceEntry {
                        index: index.clone(),
                        value: None,
                    },
                );
                ScanEvaluation::ReadData
            },
            key_reader: |sequence, _index, value| {
                results
                    .borrow_mut()
                    .get_mut(&sequence)
                    .expect("reader can't be invoked without evaluator")
                    .value = Some(value);
                Ok(())
            },
        })?;
        Ok(results.into_inner())
    }
}

            
/// A compaction process that runs in concert with a transaction manager.
pub struct TransactableCompaction<'a, Manager: FileManager> {
    /// The name of the tree being compacted.
    pub name: &'a str,
    /// The transaction manager.
    pub manager: &'a TransactionManager<Manager>,
}

            
struct TreeCompactor<'a, Root: root::Root, Manager: FileManager> {
    manager: &'a Manager,
    state: &'a State<Root>,
    vault: Option<&'a dyn AnyVault>,
    transactions: Option<TransactableCompaction<'a, Manager>>,
    scratch: &'a mut Vec<u8>,
}

            
impl<'a, Root, Manager>
    FileOp<Result<(Manager::File, TreeCompactionFinisher<'a, Root, Manager>), Error>>
    for TreeCompactor<'a, Root, Manager>
where
    Root: root::Root,
    Manager: FileManager,
{
3506
    fn execute(
3506
        self,
3506
        file: &mut dyn File,
3506
    ) -> Result<(Manager::File, TreeCompactionFinisher<'a, Root, Manager>), Error> {
3506
        let current_path = file.id().path();
3506
        let file_name = current_path
3506
            .file_name()
3506
            .ok_or_else(|| ErrorKind::message("could not retrieve file name"))?;
3506
        let mut compacted_name = file_name.to_os_string();
3506
        compacted_name.push(".compacting");
3506
        let compacted_path = current_path
3506
            .parent()
3506
            .ok_or_else(|| ErrorKind::message("couldn't access parent of file"))?
3506
            .join(compacted_name);
3506

            
3506
        if compacted_path.exists() {
            std::fs::remove_file(&compacted_path)?;
3506
        }

            
3506
        let transaction = self.transactions.as_ref().map(|transactions| {
3498
            transactions
3498
                .manager
3498
                .new_transaction([transactions.name.as_bytes()])
3506
        });
3506
        let mut new_file = self.manager.open_for_append(&compacted_path)?;
3506
        let mut writer = PagedWriter::new(None, &mut new_file, self.vault, None, 0)?;

            
        // Use the read state to list all the currently live chunks
3506
        let mut copied_chunks = HashMap::new();
3506
        let read_state = self.state.read();
3506
        let mut temporary_header = read_state.root.clone();
3506
        drop(read_state);
3506
        temporary_header.copy_data_to(false, file, &mut copied_chunks, &mut writer, self.vault)?;

            
        // Now, do the same with the write state, which should be very fast,
        // since only nodes that have changed will need to be visited.
3506
        let mut write_state = self.state.lock();
3506
        write_state
3506
            .root
3506
            .copy_data_to(true, file, &mut copied_chunks, &mut writer, self.vault)?;

            
3506
        save_tree(
3506
            &mut write_state,
3506
            self.vault,
3506
            None,
3506
            writer,
3506
            self.scratch,
3506
            true,
3506
        )?;

            
        // Close any existing handles to the file. This ensures that once we
        // save the tree, new requests to the file manager will point to the new
        // file.

            
3506
        Ok((
3506
            new_file,
3506
            TreeCompactionFinisher {
3506
                write_state,
3506
                state: self.state,
3506
                _transaction: transaction,
3506
            },
3506
        ))
3506
    }
}

            
struct TreeCompactionFinisher<'a, Root: root::Root, Manager: FileManager> {
    state: &'a State<Root>,
    write_state: MutexGuard<'a, ActiveState<Root>>,
    _transaction: Option<ManagedTransaction<Manager>>,
}

            
impl<'a, Root: root::Root, Manager: FileManager> TreeCompactionFinisher<'a, Root, Manager> {
3506
    fn finish(mut self, new_file_id: u64) {
3506
        self.write_state.file_id = Some(new_file_id);
3506
        self.write_state.publish(self.state);
3506
        drop(self);
3506
    }
}

            
struct TreeWriter<'a, Root: root::Root> {
    state: &'a State<Root>,
    vault: Option<&'a dyn AnyVault>,
    cache: Option<&'a ChunkCache>,
    scratch: &'a mut Vec<u8>,
}

            
impl<'a, Root> FileOp<Result<(), Error>> for TreeWriter<'a, Root>
where
    Root: root::Root,
{
24592
    fn execute(self, file: &mut dyn File) -> Result<(), Error> {
24592
        let mut active_state = self.state.lock();
24592
        if active_state.file_id != file.id().id() {
            return Err(Error::from(ErrorKind::TreeCompacted));
24592
        }
24592
        if active_state.root.dirty() {
24592
            let data_block = PagedWriter::new(
24592
                None,
24592
                file,
24592
                self.vault,
24592
                self.cache,
24592
                active_state.current_position,
24592
            )?;

            
24592
            self.scratch.clear();
24592
            save_tree(
24592
                &mut *active_state,
24592
                self.vault,
24592
                self.cache,
24592
                data_block,
24592
                self.scratch,
24592
                true,
24592
            )
        } else {
            Ok(())
        }
24592
    }
}

            
struct TreeModifier<'a, 'm, Root: root::Root> {
    state: &'a State<Root>,
    vault: Option<&'a dyn AnyVault>,
    cache: Option<&'a ChunkCache>,
    modification: Option<Modification<'m, Root::Value, Root::Index>>,
    scratch: &'a mut Vec<u8>,
}

            
impl<'a, 'm, Root> FileOp<Result<Vec<ModificationResult<Root::Index>>, Error>>
    for TreeModifier<'a, 'm, Root>
where
    Root: root::Root,
{
40066
    fn execute(
40066
        mut self,
40066
        file: &mut dyn File,
40066
    ) -> Result<Vec<ModificationResult<Root::Index>>, Error> {
40066
        let mut active_state = self.state.lock();
40066
        if active_state.file_id != file.id().id() {
            return Err(Error::from(ErrorKind::TreeCompacted));
40066
        }

            
40066
        let mut data_block = PagedWriter::new(
40066
            None,
40066
            file,
40066
            self.vault,
40066
            self.cache,
40066
            active_state.current_position,
40066
        )?;

            
40066
        let modification = self.modification.take().unwrap();
40066
        let persistence_mode = modification.persistence_mode;
40066
        let is_transactional = persistence_mode.transaction_id().is_some();
40066
        let max_order = active_state.max_order;

            
        // Execute the modification
40066
        let results = active_state
40066
            .root
40066
            .modify(modification, &mut data_block, max_order)?;

            
40066
        if is_transactional {
24594
            // Transactions will written to disk later.
24594
            let (_, new_position) = data_block.finish()?;
24594
            active_state.current_position = new_position;
        } else {
            // Save the tree to disk immediately.
15472
            self.scratch.clear();
15472
            save_tree(
15472
                &mut *active_state,
15472
                self.vault,
15472
                self.cache,
15472
                data_block,
15472
                self.scratch,
15472
                persistence_mode.should_synchronize(),
15472
            )?;
15472
            active_state.publish(self.state);
        }

            
40066
        Ok(results)
40066
    }
}

            
#[allow(clippy::shadow_unrelated)] // It is related, but clippy can't tell.
43570
fn save_tree<Root: root::Root>(
43570
    active_state: &mut ActiveState<Root>,
43570
    vault: Option<&dyn AnyVault>,
43570
    cache: Option<&ChunkCache>,
43570
    mut data_block: PagedWriter<'_>,
43570
    scratch: &mut Vec<u8>,
43570
    synchronize: bool,
43570
) -> Result<(), Error> {
43570
    scratch.clear();
43570
    active_state.root.serialize(&mut data_block, scratch)?;
43570
    let (file, after_data) = data_block.finish()?;
43570
    active_state.current_position = after_data;

            
    // Write a new header.
43570
    let mut header_block = PagedWriter::new(
43570
        Some(Root::HEADER),
43570
        file,
43570
        vault,
43570
        cache,
43570
        active_state.current_position,
43570
    )?;
43570
    header_block.write_chunk(scratch)?;

            
43570
    let (file, after_header) = header_block.finish()?;
43570
    active_state.current_position = after_header;
43570

            
43570
    if synchronize {
43570
        file.synchronize()?;
    }

            
43570
    Ok(())
43570
}

            
/// One or more keys.
#[derive(Debug)]
pub struct KeyRange<I: Iterator<Item = Bytes>, Bytes: AsRef<[u8]>> {
    remaining_keys: I,
    current_key: Option<Bytes>,
    _bytes: PhantomData<Bytes>,
}

            
impl<I: Iterator<Item = Bytes>, Bytes: AsRef<[u8]>> KeyRange<I, Bytes> {
    /// Returns a new instance from the keys provided.
116688
    pub fn new(mut keys: I) -> Self {
116688
        Self {
116688
            current_key: keys.next(),
116688
            remaining_keys: keys,
116688
            _bytes: PhantomData,
116688
        }
116688
    }

            
395538
    fn current_key(&self) -> Option<&[u8]> {
395538
        self.current_key.as_ref().map(Bytes::as_ref)
395538
    }
}

            
impl<I: Iterator<Item = Bytes>, Bytes: AsRef<[u8]>> Iterator for KeyRange<I, Bytes> {
    type Item = Bytes;
175172
    fn next(&mut self) -> Option<Bytes> {
175172
        let mut key = self.remaining_keys.next();
175172
        std::mem::swap(&mut key, &mut self.current_key);
175172
        key
175172
    }
}

            
#[derive(Clone, Copy)]
/// The result of evaluating a key or node that was scanned.
pub enum ScanEvaluation {
    /// Read the data for this entry.
    ReadData,
    /// Skip this entry's contained data.
    Skip,
    /// Stop scanning.
    Stop,
}

            
struct TreeGetter<
    'a,
    'keys,
    Root: root::Root,
    KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
    KeyReader: FnMut(ArcBytes<'static>, Root::Value, Root::Index) -> Result<(), Error>,
    Keys: Iterator<Item = &'keys [u8]>,
> {
    from_transaction: bool,
    state: &'a State<Root>,
    vault: Option<&'a dyn AnyVault>,
    cache: Option<&'a ChunkCache>,
    keys: Keys,
    key_evaluator: KeyEvaluator,
    key_reader: KeyReader,
}

            
impl<'a, 'keys, KeyEvaluator, KeyReader, Root, Keys> FileOp<Result<(), Error>>
    for TreeGetter<'a, 'keys, Root, KeyEvaluator, KeyReader, Keys>
where
    KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
    KeyReader: FnMut(ArcBytes<'static>, Root::Value, Root::Index) -> Result<(), Error>,
    Keys: Iterator<Item = &'keys [u8]>,
    Root: root::Root,
{
58344
    fn execute(mut self, file: &mut dyn File) -> Result<(), Error> {
58344
        if self.from_transaction {
2
            let state = self.state.lock();
2
            if state.file_id != file.id().id() {
                return Err(Error::from(ErrorKind::TreeCompacted));
2
            }
2

            
2
            state.root.get_multiple(
2
                &mut self.keys,
2
                &mut self.key_evaluator,
2
                &mut self.key_reader,
2
                file,
2
                self.vault,
2
                self.cache,
2
            )
        } else {
58342
            let state = self.state.read();
58342
            if state.file_id != file.id().id() {
                return Err(Error::from(ErrorKind::TreeCompacted));
58342
            }
58342

            
58342
            state.root.get_multiple(
58342
                &mut self.keys,
58342
                &mut self.key_evaluator,
58342
                &mut self.key_reader,
58342
                file,
58342
                self.vault,
58342
                self.cache,
58342
            )
        }
58344
    }
}

            
struct TreeScanner<
    'a,
    'keys,
    CallerError,
    Root: root::Root,
    NodeEvaluator,
    KeyEvaluator,
    KeyReader,
    KeyRangeBounds,
> where
    NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation,
    KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
    KeyReader:
        FnMut(ArcBytes<'static>, &Root::Index, Root::Value) -> Result<(), AbortError<CallerError>>,
    KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
    CallerError: Display + Debug,
{
    forwards: bool,
    from_transaction: bool,
    state: &'a State<Root>,
    vault: Option<&'a dyn AnyVault>,
    cache: Option<&'a ChunkCache>,
    range: &'keys KeyRangeBounds,
    node_evaluator: NodeEvaluator,
    key_evaluator: KeyEvaluator,
    key_reader: KeyReader,
    _phantom: PhantomData<&'keys [u8]>,
}

            
impl<
        'a,
        'keys,
        CallerError,
        Root: root::Root,
        NodeEvaluator,
        KeyEvaluator,
        KeyReader,
        KeyRangeBounds,
    > FileOp<Result<bool, AbortError<CallerError>>>
    for TreeScanner<
        'a,
        'keys,
        CallerError,
        Root,
        NodeEvaluator,
        KeyEvaluator,
        KeyReader,
        KeyRangeBounds,
    >
where
    NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation,
    KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
    KeyReader:
        FnMut(ArcBytes<'static>, &Root::Index, Root::Value) -> Result<(), AbortError<CallerError>>,
    KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
    CallerError: Display + Debug,
{
32681
    fn execute(mut self, file: &mut dyn File) -> Result<bool, AbortError<CallerError>> {
32681
        if self.from_transaction {
            let state = self.state.lock();
            if state.file_id != file.id().id() {
                return Err(AbortError::Nebari(Error::from(ErrorKind::TreeCompacted)));
            }

            
            state.root.scan(
                self.range,
                ScanArgs::new(
                    self.forwards,
                    &mut self.node_evaluator,
                    &mut self.key_evaluator,
                    &mut self.key_reader,
                ),
                file,
                self.vault,
                self.cache,
            )
        } else {
32681
            let state = self.state.read();
32681
            if state.file_id != file.id().id() {
                return Err(AbortError::Nebari(Error::from(ErrorKind::TreeCompacted)));
32681
            }
32681

            
32681
            state.root.scan(
32681
                self.range,
32681
                ScanArgs::new(
32681
                    self.forwards,
32681
                    &mut self.node_evaluator,
32681
                    &mut self.key_evaluator,
32681
                    &mut self.key_reader,
32681
                ),
32681
                file,
32681
                self.vault,
32681
                self.cache,
32681
            )
        }
32681
    }
}

            
struct TreeSequenceGetter<
    'a,
    Index: EmbeddedIndex<ArcBytes<'static>> + Clone + Debug + 'static,
    KeyEvaluator: for<'k> FnMut(SequenceId, &'k BySequenceIndex<Index>) -> ScanEvaluation,
    KeyReader: FnMut(SequenceId, BySequenceIndex<Index>, ArcBytes<'static>) -> Result<(), Error>,
    Keys: Iterator<Item = SequenceId>,
> {
    from_transaction: bool,
    state: &'a State<VersionedTreeRoot<Index>>,
    vault: Option<&'a dyn AnyVault>,
    cache: Option<&'a ChunkCache>,
    keys: Keys,
    key_evaluator: KeyEvaluator,
    key_reader: KeyReader,
}

            
impl<'a, KeyEvaluator, KeyReader, Index, Keys> FileOp<Result<(), Error>>
    for TreeSequenceGetter<'a, Index, KeyEvaluator, KeyReader, Keys>
where
    KeyEvaluator: for<'k> FnMut(SequenceId, &'k BySequenceIndex<Index>) -> ScanEvaluation,
    KeyReader: FnMut(SequenceId, BySequenceIndex<Index>, ArcBytes<'static>) -> Result<(), Error>,
    Keys: Iterator<Item = SequenceId>,
    Index: EmbeddedIndex<ArcBytes<'static>> + Clone + Debug + 'static,
{
    fn execute(mut self, file: &mut dyn File) -> Result<(), Error> {
        if self.from_transaction {
            let state = self.state.lock();
            if state.file_id != file.id().id() {
                return Err(Error::from(ErrorKind::TreeCompacted));
            }

            
            state.root.by_sequence_root.get_multiple(
                &mut self.keys.map(|sequence| sequence.0.to_be_bytes()),
                |key, index| {
                    (self.key_evaluator)(SequenceId::try_from(key.as_slice()).unwrap(), index)
                },
                |key, value, index| {
                    (self.key_reader)(SequenceId::try_from(key.as_slice()).unwrap(), index, value)
                },
                file,
                self.vault,
                self.cache,
            )
        } else {
            let state = self.state.read();
            if state.file_id != file.id().id() {
                return Err(Error::from(ErrorKind::TreeCompacted));
            }

            
            state.root.by_sequence_root.get_multiple(
                &mut self.keys.map(|sequence| sequence.0.to_be_bytes()),
                |key, index| {
                    (self.key_evaluator)(SequenceId::try_from(key.as_slice()).unwrap(), index)
                },
                |key, value, index| {
                    (self.key_reader)(SequenceId::try_from(key.as_slice()).unwrap(), index, value)
                },
                file,
                self.vault,
                self.cache,
            )
        }
    }
}

            
struct TreeSequenceScanner<
    'a,
    'keys,
    KeyEvaluator,
    KeyRangeBounds,
    DataCallback,
    CallerError,
    Index,
> where
    KeyEvaluator: FnMut(&ArcBytes<'static>, &BySequenceIndex<Index>) -> ScanEvaluation,
    KeyRangeBounds: RangeBounds<&'keys [u8]> + ?Sized,
    DataCallback:
        FnMut(KeySequence<Index>, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
    CallerError: Display + Debug,
    Index: EmbeddedIndex<ArcBytes<'static>> + Clone + Debug + 'static,
{
    forwards: bool,
    from_transaction: bool,
    state: &'a State<VersionedTreeRoot<Index>>,
    vault: Option<&'a dyn AnyVault>,
    cache: Option<&'a ChunkCache>,
    range: &'keys KeyRangeBounds,
    key_evaluator: KeyEvaluator,
    data_callback: DataCallback,
}

            
impl<'a, 'keys, KeyEvaluator, KeyRangeBounds, DataCallback, CallerError, Index>
    FileOp<Result<(), AbortError<CallerError>>>
    for TreeSequenceScanner<
        'a,
        'keys,
        KeyEvaluator,
        KeyRangeBounds,
        DataCallback,
        CallerError,
        Index,
    >
where
    KeyEvaluator: FnMut(&ArcBytes<'static>, &BySequenceIndex<Index>) -> ScanEvaluation,
    KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
    DataCallback:
        FnMut(KeySequence<Index>, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
    CallerError: Display + Debug,
    Index: EmbeddedIndex<ArcBytes<'static>> + Clone + Debug + 'static,
{
1
    fn execute(self, file: &mut dyn File) -> Result<(), AbortError<CallerError>> {
1
        let Self {
1
            forwards,
1
            from_transaction,
1
            state,
1
            vault,
1
            cache,
1
            range,
1
            mut key_evaluator,
1
            mut data_callback,
1
            ..
1
        } = self;
1
        let mapped_data_callback =
3
            |key: ArcBytes<'static>, index: &BySequenceIndex<Index>, data: ArcBytes<'static>| {
3
                let sequence = SequenceId(BigEndian::read_u64(&key));
3
                (data_callback)(
3
                    KeySequence {
3
                        key: index.key.clone(),
3
                        sequence,
3
                        last_sequence: index.last_sequence,
3
                        embedded: index.embedded.clone(),
3
                    },
3
                    data,
3
                )
3
            };
1
        if from_transaction {
            let state = state.lock();
            if state.file_id != file.id().id() {
                return Err(AbortError::Nebari(Error::from(ErrorKind::TreeCompacted)));
            }

            
            state
                .root
                .by_sequence_root
                .scan(
                    range,
                    &mut ScanArgs::new(
                        forwards,
                        |_, _, _| ScanEvaluation::ReadData,
                        &mut key_evaluator,
                        mapped_data_callback,
                    ),
                    file,
                    vault,
                    cache,
                    0,
                )
                .map(|_| {})
        } else {
1
            let state = state.read();
1
            if state.file_id != file.id().id() {
                return Err(AbortError::Nebari(Error::from(ErrorKind::TreeCompacted)));
1
            }
1

            
1
            state
1
                .root
1
                .by_sequence_root
1
                .scan(
1
                    range,
1
                    &mut ScanArgs::new(
1
                        forwards,
1
                        |_, _, _| ScanEvaluation::ReadData,
1
                        &mut key_evaluator,
1
                        mapped_data_callback,
1
                    ),
1
                    file,
1
                    vault,
1
                    cache,
1
                    0,
1
                )
1
                .map(|_| {})
        }
1
    }
}

            
/// Writes data in pages, allowing for quick scanning through the file.
pub struct PagedWriter<'a> {
    file: &'a mut dyn File,
    vault: Option<&'a dyn AnyVault>,
    cache: Option<&'a ChunkCache>,
    position: u64,
    offset: usize,
    buffered_write: [u8; WRITE_BUFFER_SIZE],
}

            
impl<'a> Deref for PagedWriter<'a> {
    type Target = dyn File;

            
347318
    fn deref(&self) -> &Self::Target {
347318
        self.file
347318
    }
}

            
impl<'a> DerefMut for PagedWriter<'a> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.file
    }
}

            
const WRITE_BUFFER_SIZE: usize = 8 * 1024;

            
impl<'a> PagedWriter<'a> {
114299
    fn new(
114299
        header: Option<PageHeader>,
114299
        file: &'a mut dyn File,
114299
        vault: Option<&'a dyn AnyVault>,
114299
        cache: Option<&'a ChunkCache>,
114299
        position: u64,
114299
    ) -> Result<Self, Error> {
114299
        let mut writer = Self {
114299
            file,
114299
            vault,
114299
            cache,
114299
            position,
114299
            offset: 0,
114299
            buffered_write: [0; WRITE_BUFFER_SIZE],
114299
        };
114299
        if let Some(header) = header {
            // Ensure alignment if we have a header
            #[allow(clippy::cast_possible_truncation)]
46135
            let padding_needed = PAGE_SIZE - (writer.position % PAGE_SIZE as u64) as usize;
46135
            let mut padding_and_header = Vec::new();
46135
            padding_and_header.resize(padding_needed + 4, header as u8);
46135
            padding_and_header.splice(
46135
                padding_and_header.len() - 4..padding_and_header.len() - 1,
46135
                b"Nbr".iter().copied(),
46135
            );
46135
            writer.write(&padding_and_header)?;
68164
        }
114299
        if writer.current_position() == 0 {
            // Write a magic code
3565
            writer.write(b"Nbri")?;
110734
        }
114299
        Ok(writer)
114299
    }

            
2929173
    const fn current_position(&self) -> u64 {
2929173
        self.position + self.offset as u64
2929173
    }

            
16991926
    fn fill_and_write_buffer(&mut self, data: &mut &[u8]) -> Result<(), Error> {
16991926
        let bytes_available = self.buffered_write.len() - self.offset;
16991926
        let bytes_to_copy = bytes_available.min(data.len());
16991926
        let new_offset = self.offset + bytes_to_copy;
16991926
        let (data_to_copy, remaining) = data.split_at(bytes_to_copy);
16991926
        self.buffered_write[self.offset..new_offset].copy_from_slice(data_to_copy);
16991926
        self.offset = new_offset;
16991926
        if self.offset == self.buffered_write.len() {
23837
            self.commit()?;
16968089
        }

            
16991926
        *data = remaining;
16991926
        Ok(())
16991926
    }

            
8496882
    fn write(&mut self, mut data: &[u8]) -> Result<usize, Error> {
8496882
        let bytes_written = data.len();
8496882

            
8496882
        self.fill_and_write_buffer(&mut data)?;

            
8496882
        if data.len() > self.buffered_write.len() {
1838
            self.file.write_all(data)?;
1838
            self.position += data.len() as u64;
        } else {
8495044
            self.fill_and_write_buffer(&mut data)?;
        }
8496882
        Ok(bytes_written)
8496882
    }

            
114299
    fn commit_if_needed(&mut self) -> Result<(), Error> {
114299
        if self.offset > 0 {
105232
            self.commit()?;
9067
        }
114299
        Ok(())
114299
    }

            
    fn commit(&mut self) -> Result<(), Error> {
129069
        self.file.write_all(&self.buffered_write[0..self.offset])?;
129069
        self.position += self.offset as u64;
129069
        self.offset = 0;
129069
        Ok(())
129069
    }

            
    /// Writes a chunk of data to the file, after possibly encrypting it.
    /// Returns the position that this chunk can be read from in the file.
    #[allow(clippy::cast_possible_truncation)]
2814874
    pub fn write_chunk(&mut self, contents: &[u8]) -> Result<u64, Error> {
2814874
        let possibly_encrypted = self.vault.as_ref().map_or_else(
2814874
            || Ok(Cow::Borrowed(contents)),
2814874
            |vault| vault.encrypt(contents).map(Cow::Owned),
2814874
        )?;
2814874
        let length =
2814874
            u32::try_from(possibly_encrypted.len()).map_err(|_| ErrorKind::ValueTooLarge)?;
2814874
        let crc = CRC32.checksum(&possibly_encrypted);
2814874
        let position = self.current_position();
2814874

            
2814874
        self.write_u32::<BigEndian>(length)?;
2814874
        self.write_u32::<BigEndian>(crc)?;
2814874
        self.write(&possibly_encrypted)?;

            
2814874
        if let (Some(cache), Some(file_id)) = (self.cache, self.file.id().id()) {
276897
            if cache.max_chunk_size() >= contents.len() {
276897
                cache.insert(
276897
                    file_id,
276897
                    position,
276897
                    ArcBytes::owned(possibly_encrypted.to_vec()),
276897
                );
276897
            }
2537977
        }

            
2814874
        Ok(position)
2814874
    }

            
    /// Writes a chunk of data to the file, after possibly encrypting it.
    /// Returns the position that this chunk can be read from in the file.
    #[allow(clippy::cast_possible_truncation)]
301830
    pub fn write_chunk_cached(&mut self, contents: ArcBytes<'static>) -> Result<u64, Error> {
301830
        let position = self.write_chunk(&contents)?;

            
301830
        if let (Some(cache), Some(file_id)) = (self.cache, self.file.id().id()) {
82000
            cache.insert(file_id, position, contents);
219830
        }

            
301830
        Ok(position)
301830
    }

            
    /// Reads a "chunk" of data located at `position`. `position` should be a
    /// location previously returned by [`Self::write_chunk()`].
160950
    pub fn read_chunk(&mut self, position: u64) -> Result<CacheEntry, Error> {
160950
        read_chunk(position, false, self.file, self.vault, self.cache)
160950
    }

            
    /// Copies a chunk from `original_position` in file `from_file` to this
    /// file. This function will update `copied_chunks` with the newly written
    /// location and return the new position. If `copied_chunks` already
    /// contains `original_position`, the already copied position is returned.
3593224
    pub fn copy_chunk_from<Hasher: BuildHasher>(
3593224
        &mut self,
3593224
        original_position: u64,
3593224
        from_file: &mut dyn File,
3593224
        copied_chunks: &mut std::collections::HashMap<u64, u64, Hasher>,
3593224
        vault: Option<&dyn AnyVault>,
3593224
    ) -> Result<u64, Error> {
3593224
        if original_position == 0 {
4690
            Ok(0)
3588534
        } else if let Some(new_position) = copied_chunks.get(&original_position) {
1794267
            Ok(*new_position)
        } else {
            // Since these are one-time copies, and receiving a Decoded entry
            // makes things tricky, we're going to not use caching for reads
            // here. This gives the added benefit for a long-running server to
            // ensure it's doing CRC checks occasionally as it copies itself.
1794267
            let chunk = match read_chunk(original_position, true, from_file, vault, None)? {
1794267
                CacheEntry::ArcBytes(buffer) => buffer,
                CacheEntry::Decoded(_) => unreachable!(),
            };
1794267
            let new_location = self.write_chunk(&chunk)?;
1794267
            copied_chunks.insert(original_position, new_location);
1794267
            Ok(new_location)
        }
3593224
    }

            
5629748
    fn write_u32<B: ByteOrder>(&mut self, value: u32) -> Result<usize, Error> {
5629748
        let mut buffer = [0_u8; 4];
5629748
        B::write_u32(&mut buffer, value);
5629748
        self.write(&buffer)
5629748
    }

            
    fn finish(mut self) -> Result<(&'a mut dyn File, u64), Error> {
114299
        self.commit_if_needed()?;
114299
        Ok((self.file, self.position))
114299
    }
}

            
#[allow(clippy::cast_possible_truncation)]
8221560
#[cfg_attr(feature = "tracing", tracing::instrument(skip(file, vault, cache)))]
fn read_chunk(
    position: u64,
    validate_crc: bool,
    file: &mut dyn File,
    vault: Option<&dyn AnyVault>,
    cache: Option<&ChunkCache>,
) -> Result<CacheEntry, Error> {
    if let (Some(cache), Some(file_id)) = (cache, file.id().id()) {
        if let Some(entry) = cache.get(file_id, position) {
            return Ok(entry);
        }
    }

            
    // Read the chunk header
    let mut header = [0_u8; 8];
    file.seek(SeekFrom::Start(position))?;
    file.read_exact(&mut header)?;
    let length = BigEndian::read_u32(&header[0..4]) as usize;

            
    let mut scratch = Vec::new();
    scratch.resize(length, 0);
    file.read_exact(&mut scratch)?;

            
    if validate_crc {
        let crc = BigEndian::read_u32(&header[4..8]);
        let computed_crc = CRC32.checksum(&scratch);
        if crc != computed_crc {
            return Err(Error::data_integrity(format!(
                "crc32 failure on chunk at position {}",
                position
            )));
        }
    }

            
    let decrypted = ArcBytes::from(match vault {
        Some(vault) => vault.decrypt(&scratch)?,
        None => scratch,
    });

            
    if let (Some(cache), Some(file_id)) = (cache, file.id().id()) {
        cache.insert(file_id, position, decrypted.clone());
    }

            
    Ok(CacheEntry::ArcBytes(decrypted))
}

            
/// Returns a value for the "order" (maximum children per node) value for the
/// database. This function is meant to keep the tree shallow while still
/// keeping the nodes smaller along the way. This is an approximation that
/// always returns an order larger than what is needed, but will never return a
/// value larger than `MAX_ORDER`.
#[allow(
    clippy::cast_precision_loss,
    clippy::cast_possible_truncation,
    clippy::cast_sign_loss
)]
#[must_use]
64111
pub fn dynamic_order(number_of_records: u64, max_order: Option<usize>) -> usize {
64111
    // Current approximation is the 3rd root.
64111
    let max_order = max_order.unwrap_or(DEFAULT_MAX_ORDER);
64111
    if number_of_records > max_order.pow(3) as u64 {
7819
        max_order
    } else {
56292
        let estimated_order = 4.max((number_of_records as f64).cbrt() as usize);
56292
        max_order.min(estimated_order)
    }
64111
}

            
/// A range of u64 values that is able to be used as keys in a tree scan, once
/// [borrowed](BorrowByteRange::borrow_as_bytes()).
#[derive(Debug)]
pub struct U64Range {
    start_bound: Bound<u64>,
    start_bound_bytes: Bound<[u8; 8]>,
    end_bound: Bound<u64>,
    end_bound_bytes: Bound<[u8; 8]>,
}

            
impl RangeBounds<u64> for U64Range {
    fn start_bound(&self) -> Bound<&u64> {
        match &self.start_bound {
            Bound::Included(value) => Bound::Included(value),
            Bound::Excluded(value) => Bound::Excluded(value),
            Bound::Unbounded => Bound::Unbounded,
        }
    }

            
    fn end_bound(&self) -> Bound<&u64> {
        match &self.end_bound {
            Bound::Included(value) => Bound::Included(value),
            Bound::Excluded(value) => Bound::Excluded(value),
            Bound::Unbounded => Bound::Unbounded,
        }
    }
}

            
/// A borrowed range in byte form.
#[derive(Debug, Clone)]
pub struct BorrowedRange<'a> {
    /// The start bound for this range.
    pub start: Bound<&'a [u8]>,
    /// The end bound for this range.
    pub end: Bound<&'a [u8]>,
}

            
/// Borrows a range.
pub trait BorrowByteRange<'a> {
    /// Returns a borrowed version of byte representation the original range.
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a>;
}

            
impl<'a> BorrowByteRange<'a> for Range<Vec<u8>> {
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
        BorrowedRange {
            start: Bound::Included(&self.start[..]),
            end: Bound::Excluded(&self.end[..]),
        }
    }
}

            
impl<'a> BorrowByteRange<'a> for U64Range {
1
    fn borrow_as_bytes(&'a self) -> BorrowedRange<'a> {
1
        BorrowedRange {
1
            start: match &self.start_bound_bytes {
                Bound::Included(bytes) => Bound::Included(&bytes[..]),
                Bound::Excluded(bytes) => Bound::Excluded(&bytes[..]),
1
                Bound::Unbounded => Bound::Unbounded,
            },
1
            end: match &self.end_bound_bytes {
                Bound::Included(bytes) => Bound::Included(&bytes[..]),
                Bound::Excluded(bytes) => Bound::Excluded(&bytes[..]),
1
                Bound::Unbounded => Bound::Unbounded,
            },
        }
1
    }
}

            
impl<'a, 'b: 'a> RangeBounds<&'a [u8]> for BorrowedRange<'b> {
3
    fn start_bound(&self) -> Bound<&&'a [u8]> {
3
        match &self.start {
            Bound::Included(value) => Bound::Included(value),
            Bound::Excluded(value) => Bound::Excluded(value),
3
            Bound::Unbounded => Bound::Unbounded,
        }
3
    }

            
3
    fn end_bound(&self) -> Bound<&&'a [u8]> {
3
        match &self.end {
            Bound::Included(value) => Bound::Included(value),
            Bound::Excluded(value) => Bound::Excluded(value),
3
            Bound::Unbounded => Bound::Unbounded,
        }
3
    }
}

            
impl U64Range {
    /// Creates a new instance from the range passed in.
1
    pub fn new<B: RangeBounds<T>, T: Clone + Into<u64>>(bounds: B) -> Self {
1
        Self {
1
            start_bound: match bounds.start_bound() {
                Bound::Included(id) => Bound::Included(id.clone().into()),
                Bound::Excluded(id) => Bound::Excluded(id.clone().into()),
1
                Bound::Unbounded => Bound::Unbounded,
            },
1
            start_bound_bytes: match bounds.start_bound() {
                Bound::Included(id) => Bound::Included(id.clone().into().to_be_bytes()),
                Bound::Excluded(id) => Bound::Excluded(id.clone().into().to_be_bytes()),
1
                Bound::Unbounded => Bound::Unbounded,
            },
1
            end_bound: match bounds.end_bound() {
                Bound::Included(id) => Bound::Included(id.clone().into()),
                Bound::Excluded(id) => Bound::Excluded(id.clone().into()),
1
                Bound::Unbounded => Bound::Unbounded,
            },
1
            end_bound_bytes: match bounds.end_bound() {
                Bound::Included(id) => Bound::Included(id.clone().into().to_be_bytes()),
                Bound::Excluded(id) => Bound::Excluded(id.clone().into().to_be_bytes()),
1
                Bound::Unbounded => Bound::Unbounded,
            },
        }
1
    }
}

            
/// The key and value of an entry..
#[derive(Eq, PartialEq, Clone, Debug, Default)]
pub struct KeyValue<Key, Value> {
    /// The key of this entry.
    pub key: Key,
    /// The value of this entry.
    pub value: Value,
}

            
/// The key and index of an entry.
#[derive(Eq, PartialEq, Clone, Debug, Default)]
pub struct KeyIndex<Index> {
    /// The key of this entry.
    pub key: ArcBytes<'static>,
    /// The index of this entry.
    pub index: Index,
}

            
/// A key and index of an entry from a tree with [`Root`] `R`.
pub type TreeKeyIndex<R> = KeyIndex<<R as Root>::Index>;

            
/// The value and index of an entry.
#[derive(Eq, PartialEq, Clone, Debug, Default)]
pub struct ValueIndex<Value, Index> {
    /// The value of this entry.
    pub value: Value,
    /// The index of this entry.
    pub index: Index,
}

            
/// A value and index of an entry from a tree with [`Root`] `R`.
pub type TreeValueIndex<R> = ValueIndex<<R as Root>::Value, <R as Root>::Index>;

            
/// A complete entry in a tree.
#[derive(Eq, PartialEq, Clone, Debug, Default)]
pub struct Entry<Value, Index> {
    /// The key of this entry.
    pub key: ArcBytes<'static>,
    /// The value of this entry.
    pub value: Value,
    /// The index of this entry.
    pub index: Index,
}

            
/// An entry from a tree with [`Root`] `R`.
pub type TreeEntry<R> = Entry<<R as Root>::Value, <R as Root>::Index>;

            
/// An index that is embeddable within a tree.
///
/// An index is a computed value that is stored directly within the B-Tree
/// structure. Because these are encoded directly onto the nodes, they should be
/// kept shorter for better performance.
pub trait EmbeddedIndex<Value>: Serializable + Clone + Debug + Send + Sync + 'static {
    /// The reduced representation of this index.
    type Reduced: Serializable + Clone + Debug + Send + Sync + 'static;
    /// The reducer that reduces arrays of `Self` or `Self::Reduced` into `Self::Reduced`.
    type Indexer: Indexer<Value, Self> + Reducer<Self, Self::Reduced>;
}

            
/// A type that can be serialized and deserialized.
pub trait Serializable: Send + Sync + Sized + 'static {
    /// Serializes into `writer` and returns the number of bytes written.
    fn serialize_to<W: WriteBytesExt>(&self, writer: &mut W) -> Result<usize, Error>;
    /// Deserializes from `reader`, and returns the deserialized instance.
    /// Implementors should not expect for the reader to be fully consumed at
    /// the end of this call.
    fn deserialize_from<R: ReadBytesExt>(reader: &mut R) -> Result<Self, Error>;
}

            
impl<Value> EmbeddedIndex<Value> for () {
    type Reduced = Self;
    type Indexer = Self;
}

            
impl<Value> Indexer<Value, ()> for () {
383293
    fn index(&self, _key: &ArcBytes<'_>, _value: Option<&Value>) -> Self {}
}

            
impl Serializable for () {
8753017
    fn serialize_to<W: WriteBytesExt>(&self, _writer: &mut W) -> Result<usize, Error> {
8753017
        Ok(0)
8753017
    }

            
14472591
    fn deserialize_from<R: ReadBytesExt>(_reader: &mut R) -> Result<Self, Error> {
14472591
        Ok(())
14472591
    }
}

            
/// A single key's modification result.
pub struct ModificationResult<Index> {
    /// The key that was changed.
    pub key: ArcBytes<'static>,
    /// The updated index, if the key is still present.
    pub index: Option<Index>,
}

            
/// The result of a change to a [`BTreeNode`].
751679
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub enum ChangeResult {
    /// No changes were made.
    Unchanged,
    /// The node modified is now empty and should be removed.
    Remove,
    /// The node modified is now has fewer entries than the tree should have,
    /// and its children should be absorbed into neighbors.
    Absorb,
    /// The node was changed.
    Changed,
    /// The node modified is now has more entries than the tree should have, and
    /// it should be split.
    Split,
}

            
#[cfg(test)]
mod tests {
    use std::{
        collections::{BTreeMap, HashSet},
        convert::Infallible,
        path::Path,
    };

            
    use nanorand::{Pcg64, Rng};
    use tempfile::NamedTempFile;

            
    use super::*;
    use crate::io::{
        any::AnyFileManager,
        fs::{StdFile, StdFileManager},
        memory::{MemoryFile, MemoryFileManager, MemoryFileOpener},
        ManagedFileOpener,
    };

            
2565
    fn test_paged_write(offset: usize, length: usize) -> Result<(), Error> {
2565
        let mut file = MemoryFileOpener.open_for_append(format!("test-{}-{}", offset, length))?;
2565
        let mut paged_writer =
2565
            PagedWriter::new(Some(PageHeader::VersionedHeader), &mut file, None, None, 0)?;

            
2565
        let mut scratch = Vec::new();
2565
        scratch.resize(offset.max(length), 0);
2565
        if offset > 0 {
2560
            paged_writer.write(&scratch[..offset])?;
5
        }
2565
        scratch.fill(1);
2565
        let written_position = paged_writer.write_chunk(&scratch[..length])?;
2565
        drop(paged_writer.finish());
2565

            
2565
        match read_chunk(written_position, true, &mut file, None, None)? {
2565
            CacheEntry::ArcBytes(data) => {
2565
                assert_eq!(data.len(), length);
657153
                assert!(data.iter().all(|i| i == 1));
            }
            CacheEntry::Decoded(_) => unreachable!(),
        }

            
2565
        drop(file);
2565

            
2565
        Ok(())
2565
    }

            
    /// Tests the writing of pages at various boundaries. This should cover
    /// every edge case: offset is on page 1/2/3, write stays on first page or
    /// lands on page 1/2/3 end or extends onto page 4.
1
    #[test]
1
    fn paged_writer() {
513
        for offset in 0..=PAGE_SIZE * 2 {
2565
            for length in [
                1,
513
                PAGE_SIZE / 2,
513
                PAGE_SIZE,
513
                PAGE_SIZE * 3 / 2,
513
                PAGE_SIZE * 2,
            ] {
2565
                if let Err(err) = test_paged_write(offset, length) {
                    unreachable!(
                        "paged writer failure at offset {} length {}: {:?}",
                        offset, length, err
                    );
2565
                }
            }
        }
1
    }

            
3046
    fn insert_one_record<R: Root<Value = ArcBytes<'static>> + Default, F: ManagedFile>(
3046
        context: &Context<F::Manager>,
3046
        file_path: &Path,
3046
        ids: &mut HashSet<u64>,
3046
        rng: &mut Pcg64,
3046
        max_order: Option<usize>,
3046
    ) {
3046
        let id = loop {
3046
            let id = rng.generate::<u64>();
3046
            if ids.insert(id) {
3046
                break id;
            }
        };
3046
        let id_buffer = ArcBytes::from(id.to_be_bytes().to_vec());
3046
        {
3046
            let mut tree = TreeFile::<R, F>::write(
3046
                file_path,
3046
                State::new(None, max_order, R::default()),
3046
                context,
3046
                None,
3046
            )
3046
            .unwrap();
3046
            tree.set(None, id_buffer.clone(), ArcBytes::from(b"hello world"))
3046
                .unwrap();
3046

            
3046
            // This shouldn't have to scan the file, as the data fits in memory.
3046
            let value = tree.get(&id_buffer, false).unwrap();
3046
            assert_eq!(&*value.unwrap(), b"hello world");
        }

            
        // Try loading the file up and retrieving the data.
        {
3046
            let mut tree = TreeFile::<R, F>::write(
3046
                file_path,
3046
                State::new(None, max_order, R::default()),
3046
                context,
3046
                None,
3046
            )
3046
            .unwrap();
3046
            let value = tree.get(&id_buffer, false).unwrap();
3046
            assert_eq!(&*value.unwrap(), b"hello world");
        }
3046
    }

            
2000
    fn remove_one_record<R: Root<Value = ArcBytes<'static>> + Default, F: ManagedFile>(
2000
        context: &Context<F::Manager>,
2000
        file_path: &Path,
2000
        id: u64,
2000
        max_order: Option<usize>,
2000
    ) {
2000
        let id_buffer = ArcBytes::from(id.to_be_bytes().to_vec());
2000
        {
2000
            let file = context.file_manager.append(file_path).unwrap();
2000
            let state = State::new(None, max_order, R::default());
2000
            TreeFile::<R, F>::initialize_state(&state, file.id(), context, None).unwrap();
2000
            let mut tree =
2000
                TreeFile::<R, F>::new(file, state, context.vault.clone(), context.cache.clone())
2000
                    .unwrap();
2000
            tree.modify(Modification {
2000
                persistence_mode: PersistenceMode::Sync,
2000
                keys: vec![id_buffer.clone()],
2000
                operation: Operation::Remove,
2000
            })
2000
            .unwrap();
2000

            
2000
            // The row should no longer exist in memory.
2000
            let value = tree.get(&id_buffer, false).unwrap();
2000
            assert_eq!(value, None);
        }

            
        // Try loading the file up and retrieving the data.
        {
2000
            let file = context.file_manager.append(file_path).unwrap();
2000
            let state = State::default();
2000
            TreeFile::<R, F>::initialize_state(&state, file.id(), context, None).unwrap();
2000

            
2000
            let mut tree =
2000
                TreeFile::<R, F>::new(file, state, context.vault.clone(), context.cache.clone())
2000
                    .unwrap();
2000
            let value = tree.get(&id_buffer, false).unwrap();
2000
            assert_eq!(value, None);
        }
2000
    }

            
1
    #[test]
1
    fn simple_inserts() {
1
        const ORDER: usize = 4;
1

            
1
        let mut rng = Pcg64::new_seed(1);
1
        let context = Context {
1
            file_manager: StdFileManager::default(),
1
            vault: None,
1
            cache: None,
1
        };
1
        let temp_dir = crate::test_util::TestDirectory::new("btree-tests");
1
        std::fs::create_dir(&temp_dir).unwrap();
1
        let file_path = temp_dir.join("tree");
1
        let mut ids = HashSet::new();
1
        // Insert up to the limit of a LEAF, which is ORDER - 1.
3
        for _ in 0..ORDER - 1 {
3
            insert_one_record::<Versioned, StdFile>(
3
                &context,
3
                &file_path,
3
                &mut ids,
3
                &mut rng,
3
                Some(ORDER),
3
            );
3
        }
1
        println!("Successfully inserted up to ORDER - 1 nodes.");
1

            
1
        // The next record will split the node
1
        insert_one_record::<Versioned, StdFile>(
1
            &context,
1
            &file_path,
1
            &mut ids,
1
            &mut rng,
1
            Some(ORDER),
1
        );
1
        println!("Successfully introduced one layer of depth.");

            
        // Insert a lot more.
1001
        for _ in 0..1_000 {
1000
            insert_one_record::<Versioned, StdFile>(
1000
                &context,
1000
                &file_path,
1000
                &mut ids,
1000
                &mut rng,
1000
                Some(ORDER),
1000
            );
1000
        }
1
    }

            
2
    fn remove<R: Root<Value = ArcBytes<'static>> + Default>(label: &str) {
2
        const ORDER: usize = 4;
2

            
2
        // We've seen a couple of failures in CI, but have never been able to
2
        // reproduce locally. There used to be a small bit of randomness that
2
        // wasn't deterministic in the conversion between a HashSet and a Vec
2
        // for the IDs. This randomness has been removed, and instead we're now
2
        // embracing running a randomly seeded test -- and logging the seed that
2
        // fails so that we can attempt to reproduce it outside of CI.
2

            
2
        let mut seed_rng = Pcg64::new();
2
        let seed = seed_rng.generate();
2
        println!("Seeding removal {} with {}", label, seed);
2
        let mut rng = Pcg64::new_seed(seed);
2
        let context = Context {
2
            file_manager: StdFileManager::default(),
2
            vault: None,
2
            cache: None,
2
        };
2
        let temp_dir = crate::test_util::TestDirectory::new(format!("btree-removals-{}", label));
2
        std::fs::create_dir(&temp_dir).unwrap();
2
        let file_path = temp_dir.join("tree");
2
        let mut ids = HashSet::new();
2002
        for _ in 0..1000 {
2000
            insert_one_record::<R, StdFile>(&context, &file_path, &mut ids, &mut rng, Some(ORDER));
2000
        }

            
2
        let mut ids = ids.into_iter().collect::<Vec<_>>();
2
        ids.sort_unstable();
2
        rng.shuffle(&mut ids);

            
        // Remove each of the records
2002
        for id in ids {
2000
            remove_one_record::<R, StdFile>(&context, &file_path, id, Some(ORDER));
2000
        }

            
        // Test being able to add a record again
2
        insert_one_record::<R, StdFile>(
2
            &context,
2
            &file_path,
2
            &mut HashSet::default(),
2
            &mut rng,
2
            Some(ORDER),
2
        );
2
    }

            
1
    #[test]
1
    fn remove_versioned() {
1
        remove::<Versioned>("versioned");
1
    }

            
1
    #[test]
1
    fn remove_unversioned() {
1
        remove::<Unversioned>("unversioned");
1
    }

            
1
    #[test]
1
    fn spam_insert_std_versioned() {
1
        spam_insert::<Versioned, StdFile>("std-versioned");
1
    }

            
1
    #[test]
1
    fn spam_insert_std_unversioned() {
1
        spam_insert::<Unversioned, StdFile>("std-unversioned");
1
    }

            
2
    fn spam_insert<R: Root<Value = ArcBytes<'static>> + Default, F: ManagedFile>(name: &str) {
2
        const RECORDS: usize = 1_000;
2
        let mut rng = Pcg64::new_seed(1);
2000
        let ids = (0..RECORDS).map(|_| rng.generate::<u64>());
2
        let context = Context {
2
            file_manager: F::Manager::default(),
2
            vault: None,
2
            cache: Some(ChunkCache::new(100, 160_384)),
2
        };
2
        let temp_dir = crate::test_util::TestDirectory::new(format!("spam-inserts-{}", name));
2
        std::fs::create_dir(&temp_dir).unwrap();
2
        let file_path = temp_dir.join("tree");
2
        let state = State::default();
2
        let mut tree = TreeFile::<R, F>::write(file_path, state, &context, None).unwrap();
2000
        for (_index, id) in ids.enumerate() {
2000
            let id_buffer = ArcBytes::from(id.to_be_bytes().to_vec());
2000
            tree.set(None, id_buffer.clone(), ArcBytes::from(b"hello world"))
2000
                .unwrap();
2000
        }
2
    }

            
1
    #[test]
1
    fn std_bulk_insert_versioned() {
1
        bulk_insert::<Versioned, _>("std-versioned", StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn memory_bulk_insert_versioned() {
1
        bulk_insert::<Versioned, _>("memory-versioned", MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn any_bulk_insert_versioned() {
1
        bulk_insert::<Versioned, _>("any-versioned", AnyFileManager::memory());
1
        bulk_insert::<Versioned, _>("any-versioned", AnyFileManager::std());
1
    }

            
1
    #[test]
1
    fn std_bulk_insert_unversioned() {
1
        bulk_insert::<Unversioned, _>("std-unversioned", StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn memory_bulk_insert_unversioned() {
1
        bulk_insert::<Unversioned, _>("memory-unversioned", MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn any_bulk_insert_unversioned() {
1
        bulk_insert::<Unversioned, _>("any-unversioned", AnyFileManager::memory());
1
        bulk_insert::<Unversioned, _>("any-unversioned", AnyFileManager::std());
1
    }

            
8
    fn bulk_insert<R: Root<Value = ArcBytes<'static>> + Default, M: FileManager>(
8
        name: &str,
8
        file_manager: M,
8
    ) {
8
        const RECORDS_PER_BATCH: usize = 10;
8
        const BATCHES: usize = 1000;
8
        let mut rng = Pcg64::new_seed(1);
8
        let context = Context {
8
            file_manager,
8
            vault: None,
8
            cache: Some(ChunkCache::new(100, 160_384)),
8
        };
8
        let temp_dir = crate::test_util::TestDirectory::new(format!("bulk-inserts-{}", name));
8
        std::fs::create_dir(&temp_dir).unwrap();
8
        let file_path = temp_dir.join("tree");
8
        let state = State::default();
8
        let mut tree = TreeFile::<R, M::File>::write(file_path, state, &context, None).unwrap();
8008
        for _ in 0..BATCHES {
8000
            let mut ids = (0..RECORDS_PER_BATCH)
80000
                .map(|_| rng.generate::<u64>())
8000
                .collect::<Vec<_>>();
8000
            ids.sort_unstable();
8000
            let modification = Modification {
8000
                persistence_mode: PersistenceMode::Sync,
8000
                keys: ids
8000
                    .iter()
80000
                    .map(|id| ArcBytes::from(id.to_be_bytes().to_vec()))
8000
                    .collect(),
8000
                operation: Operation::Set(ArcBytes::from(b"hello world")),
8000
            };
8000
            tree.modify(modification).unwrap();

            
            // Try five random gets
48000
            for _ in 0..5 {
40000
                let index = rng.generate_range(0..ids.len());
40000
                let id = ArcBytes::from(ids[index].to_be_bytes().to_vec());
40000
                let value = tree.get(&id, false).unwrap();
40000
                assert_eq!(&*value.unwrap(), b"hello world");
            }
        }
8
    }

            
1
    #[test]
1
    fn batch_get() {
1
        let context = Context {
1
            file_manager: MemoryFileManager::default(),
1
            vault: None,
1
            cache: None,
1
        };
1
        let state = State::default();
1
        // let file = context.file_manager.append("test").unwrap();
1
        let mut tree =
1
            TreeFile::<Versioned, MemoryFile>::write("test", state, &context, None).unwrap();
1
        // Create enough records to go 4 levels deep.
1
        let mut ids = Vec::new();
81
        for id in 0..3_u32.pow(4) {
81
            let id_buffer = ArcBytes::from(id.to_be_bytes().to_vec());
81
            tree.set(None, id_buffer.clone(), id_buffer.clone())
81
                .unwrap();
81
            ids.push(id_buffer);
81
        }

            
        // Get them all
1
        let mut all_records = tree
1
            .get_multiple(ids.iter().map(ArcBytes::as_slice), false)
1
            .unwrap();
1
        // Order isn't guaranteeed.
1
        all_records.sort();
1
        assert_eq!(
1
            all_records
1
                .iter()
81
                .map(|kv| kv.1.clone())
1
                .collect::<Vec<_>>(),
1
            ids
1
        );

            
        // Try some ranges
1
        let mut unbounded_to_five = tree.get_range(&(..ids[5].as_slice()), false).unwrap();
1
        unbounded_to_five.sort();
1
        assert_eq!(&all_records[..5], &unbounded_to_five);
1
        let mut one_to_ten_unbounded = tree
1
            .get_range(&(ids[1].as_slice()..ids[10].as_slice()), false)
1
            .unwrap();
1
        one_to_ten_unbounded.sort();
1
        assert_eq!(&all_records[1..10], &one_to_ten_unbounded);
1
        let mut bounded_upper = tree
1
            .get_range(&(ids[3].as_slice()..=ids[50].as_slice()), false)
1
            .unwrap();
1
        bounded_upper.sort();
1
        assert_eq!(&all_records[3..=50], &bounded_upper);
1
        let mut unbounded_upper = tree.get_range(&(ids[60].as_slice()..), false).unwrap();
1
        unbounded_upper.sort();
1
        assert_eq!(&all_records[60..], &unbounded_upper);
1
        let mut all_through_scan = tree.get_range(&(..), false).unwrap();
1
        all_through_scan.sort();
1
        assert_eq!(&all_records, &all_through_scan);
1
    }

            
8
    fn compact<R: Root<Value = ArcBytes<'static>> + Default, M: FileManager>(
8
        label: &str,
8
        file_manager: M,
8
    ) {
8
        const ORDER: usize = 4;
8
        let mut rng = Pcg64::new_seed(1);
8
        let context = Context {
8
            file_manager,
8
            vault: None,
8
            cache: None,
8
        };
8
        let temp_dir = crate::test_util::TestDirectory::new(format!("btree-compact-{}", label));
8
        std::fs::create_dir(&temp_dir).unwrap();
8
        let file_path = temp_dir.join("tree");
8
        let mut ids = HashSet::new();
48
        for _ in 0..5 {
40
            insert_one_record::<R, M::File>(&context, &file_path, &mut ids, &mut rng, Some(ORDER));
40
        }

            
8
        let mut tree =
8
            TreeFile::<R, M::File>::write(&file_path, State::default(), &context, None).unwrap();
8
        let pre_compact_size = context.file_manager.file_length(&file_path).unwrap();
8
        tree = tree.compact(&context.file_manager, None).unwrap();
8
        let after_compact_size = context.file_manager.file_length(&file_path).unwrap();
8
        assert!(
8
            after_compact_size < pre_compact_size,
            "compact didn't remove any data"
        );

            
        // Try fetching all the records to ensure they're still present.
48
        for id in ids {
40
            let id_buffer = ArcBytes::from(id.to_be_bytes().to_vec());
40
            tree.get(&id_buffer, false)
40
                .unwrap()
40
                .expect("no value found");
40
        }
8
    }

            
1
    #[test]
1
    fn std_compact_versioned() {
1
        compact::<Versioned, _>("versioned", StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn std_compact_unversioned() {
1
        compact::<Unversioned, _>("unversioned", StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn memory_compact_versioned() {
1
        compact::<Versioned, _>("versioned", MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn memory_compact_unversioned() {
1
        compact::<Unversioned, _>("unversioned", MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn any_compact_versioned() {
1
        compact::<Versioned, _>("any-versioned", AnyFileManager::std());
1
        compact::<Versioned, _>("any-versioned", AnyFileManager::memory());
1
    }

            
1
    #[test]
1
    fn any_compact_unversioned() {
1
        compact::<Unversioned, _>("any-unversioned", AnyFileManager::std());
1
        compact::<Unversioned, _>("any-unversioned", AnyFileManager::memory());
1
    }

            
1
    #[test]
1
    fn revision_history() {
1
        let context = Context {
1
            file_manager: StdFileManager::default(),
1
            vault: None,
1
            cache: None,
1
        };
1
        let state = State::default();
1
        let tempfile = NamedTempFile::new().unwrap();
1
        let mut tree =
1
            TreeFile::<Versioned, StdFile>::write(tempfile.path(), state, &context, None).unwrap();
1

            
1
        // Store three versions of the same key.
1
        tree.set(None, ArcBytes::from(b"a"), ArcBytes::from(b"0"))
1
            .unwrap();
1
        tree.set(None, ArcBytes::from(b"a"), ArcBytes::from(b"1"))
1
            .unwrap();
1
        tree.set(None, ArcBytes::from(b"a"), ArcBytes::from(b"2"))
1
            .unwrap();
1

            
1
        // Retrieve the sequences
1
        let mut sequences = Vec::new();
1
        tree.scan_sequences::<Infallible, _, _, _>(
1
            ..,
1
            true,
1
            false,
3
            |_| ScanEvaluation::ReadData,
3
            |sequence, value| {
3
                sequences.push((sequence, value));
3
                Ok(())
3
            },
1
        )
1
        .unwrap();
1
        assert_eq!(sequences.len(), 3);
2
        sequences.sort_by(|a, b| a.0.sequence.cmp(&b.0.sequence));
3
        assert!(sequences.iter().all(|s| s.0.key.as_slice() == b"a"));
1
        assert_eq!(sequences[0].0.last_sequence, None);
1
        assert_eq!(sequences[1].0.last_sequence, Some(sequences[0].0.sequence));
1
        assert_eq!(sequences[2].0.last_sequence, Some(sequences[1].0.sequence));

            
1
        assert_eq!(sequences[0].1, b"0");
1
        assert_eq!(sequences[1].1, b"1");
1
        assert_eq!(sequences[2].1, b"2");
1
    }

            
    struct ExtendToPageBoundaryPlus(u64);

            
    impl FileOp<()> for ExtendToPageBoundaryPlus {
        #[allow(clippy::cast_possible_truncation)]
2
        fn execute(self, file: &mut dyn File) {
2
            let length = file.length().unwrap();
2
            let bytes = vec![42_u8; (length - (length % PAGE_SIZE as u64) + self.0) as usize];
2
            file.write_all(&bytes).unwrap();
2
        }
    }

            
1
    #[test]
1
    fn header_incompatible() {
1
        let context = Context {
1
            file_manager: MemoryFileManager::default(),
1
            vault: None,
1
            cache: None,
1
        };
1
        let temp_dir = crate::test_util::TestDirectory::new("header_incompatible");
1
        std::fs::create_dir(&temp_dir).unwrap();
1
        let file_path = temp_dir.join("tree");
1

            
1
        // Write some data using unversioned
1
        {
1
            let state = State::default();
1
            let mut tree =
1
                TreeFile::<Unversioned, MemoryFile>::write(&file_path, state, &context, None)
1
                    .unwrap();
1
            tree.set(
1
                None,
1
                ArcBytes::from(b"test"),
1
                ArcBytes::from(b"hello world"),
1
            )
1
            .unwrap();
1
        }
1
        // Try reading it as versioned.
1
        let mut file = context.file_manager.append(&file_path).unwrap();
1
        file.execute(ExtendToPageBoundaryPlus(3));
1
        let state = State::default();
1
        assert!(matches!(
1
            TreeFile::<Versioned, MemoryFile>::write(&file_path, state, &context, None)
1
                .unwrap_err()
1
                .kind,
            ErrorKind::DataIntegrity(_)
        ));
1
    }

            
1
    #[test]
1
    fn file_length_page_offset_plus_a_little() {
1
        let context = Context {
1
            file_manager: MemoryFileManager::default(),
1
            vault: None,
1
            cache: None,
1
        };
1
        let temp_dir = crate::test_util::TestDirectory::new("page-header-edge-cases");
1
        std::fs::create_dir(&temp_dir).unwrap();
1
        let file_path = temp_dir.join("tree");
1

            
1
        // Write some data.
1
        {
1
            let state = State::default();
1
            let mut tree =
1
                TreeFile::<Unversioned, MemoryFile>::write(&file_path, state, &context, None)
1
                    .unwrap();
1
            tree.set(
1
                None,
1
                ArcBytes::from(b"test"),
1
                ArcBytes::from(b"hello world"),
1
            )
1
            .unwrap();
1
        }
1
        // Test when the file is of a length that is less than 4 bytes longer than a multiple of a PAGE_SIZE.
1
        let mut file = context.file_manager.append(&file_path).unwrap();
1
        file.execute(ExtendToPageBoundaryPlus(3));
1
        let state = State::default();
1
        let mut tree =
1
            TreeFile::<Unversioned, MemoryFile>::write(&file_path, state, &context, None).unwrap();
1

            
1
        assert_eq!(tree.get(b"test", false).unwrap().unwrap(), b"hello world");
1
    }

            
8
    fn edit_keys<R: Root<Value = ArcBytes<'static>> + Default, M: FileManager>(
8
        label: &str,
8
        file_manager: M,
8
    ) {
8
        let context = Context {
8
            file_manager,
8
            vault: None,
8
            cache: None,
8
        };
8
        let temp_dir = crate::test_util::TestDirectory::new(format!("edit-keys-{}", label));
8
        std::fs::create_dir(&temp_dir).unwrap();
8
        let file_path = temp_dir.join("tree");
8

            
8
        let mut tree =
8
            TreeFile::<R, M::File>::write(file_path, State::default(), &context, None).unwrap();
8
        assert!(matches!(
8
            tree.compare_and_swap(b"test", Some(&b"won't match"[..]), None, None)
8
                .unwrap_err(),
            CompareAndSwapError::Conflict(_)
        ));
8
        tree.compare_and_swap(b"test", None, Some(ArcBytes::from(b"first")), None)
8
            .unwrap();
8
        assert!(matches!(
8
            tree.compare_and_swap(b"test", Some(&b"won't match"[..]), None, None)
8
                .unwrap_err(),
            CompareAndSwapError::Conflict(_)
        ));
8
        tree.compare_and_swap(
8
            b"test",
8
            Some(&b"first"[..]),
8
            Some(ArcBytes::from(b"second")),
8
            None,
8
        )
8
        .unwrap();
8

            
8
        let stored = tree.replace(b"test", b"third", None).unwrap().0.unwrap();
8
        assert_eq!(stored, b"second");

            
8
        tree.compare_and_swap(b"test", Some(b"third"), None, None)
8
            .unwrap();
8
        assert!(tree.get(b"test", false).unwrap().is_none());
8
    }

            
1
    #[test]
1
    fn std_edit_keys_versioned() {
1
        edit_keys::<Versioned, _>("versioned", StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn std_edit_keys_unversioned() {
1
        edit_keys::<Unversioned, _>("unversioned", StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn memory_edit_keys_versioned() {
1
        edit_keys::<Versioned, _>("versioned", MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn memory_edit_keys_unversioned() {
1
        edit_keys::<Unversioned, _>("unversioned", MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn any_edit_keys_versioned() {
1
        edit_keys::<Versioned, _>("any-versioned", AnyFileManager::std());
1
        edit_keys::<Versioned, _>("any-versioned", AnyFileManager::memory());
1
    }

            
1
    #[test]
1
    fn any_edit_keys_unversioned() {
1
        edit_keys::<Unversioned, _>("any-unversioned", AnyFileManager::std());
1
        edit_keys::<Unversioned, _>("any-unversioned", AnyFileManager::memory());
1
    }

            
1
    #[test]
1
    fn reduce() {
1
        #[derive(Debug)]
1
        struct ExcludedStart<'a>(&'a [u8]);
1

            
1
        impl<'a> RangeBounds<&'a [u8]> for ExcludedStart<'a> {
748
            fn start_bound(&self) -> Bound<&&'a [u8]> {
748
                Bound::Excluded(&self.0)
748
            }
1

            
668
            fn end_bound(&self) -> Bound<&&'a [u8]> {
668
                Bound::Unbounded
668
            }
1
        }
1

            
1
        let context = Context {
1
            file_manager: StdFileManager::default(),
1
            vault: None,
1
            cache: None,
1
        };
1
        let temp_dir = crate::test_util::TestDirectory::new("reduce");
1
        std::fs::create_dir(&temp_dir).unwrap();
1
        let file_path = temp_dir.join("tree");
1

            
1
        let mut tree = TreeFile::<Unversioned, StdFile>::write(
1
            &file_path,
1
            State::new(None, Some(4), Unversioned::default()),
1
            &context,
1
            None,
1
        )
1
        .unwrap();
257
        for i in 0..=u8::MAX {
256
            let bytes = ArcBytes::from([i]);
256
            tree.set(None, bytes.clone(), bytes.clone()).unwrap();
256
        }

            
1
        assert_eq!(tree.reduce(&(..), false).unwrap().unwrap().alive_keys, 256);
1
        assert_eq!(
1
            tree.reduce(&ExcludedStart(&[0]), false)
1
                .unwrap()
1
                .unwrap()
1
                .alive_keys,
1
            255
1
        );
1
        assert_eq!(
1
            tree.reduce(&(&[0][..]..&[u8::MAX][..]), false)
1
                .unwrap()
1
                .unwrap()
1
                .alive_keys,
1
            255
1
        );
1
        assert_eq!(
1
            tree.reduce(&(&[1][..]..=&[100][..]), false)
1
                .unwrap()
1
                .unwrap()
1
                .alive_keys,
1
            100
1
        );

            
256
        for start in 0..u8::MAX {
32640
            for end in start + 1..=u8::MAX {
32640
                assert_eq!(
32640
                    tree.reduce(&(&[start][..]..&[end][..]), false)
32640
                        .unwrap()
32640
                        .unwrap()
32640
                        .alive_keys,
32640
                    u64::from(end - start)
32640
                );
            }
        }
1
    }

            
8
    fn first_last<R: Root<Value = ArcBytes<'static>> + Default, M: FileManager>(
8
        label: &str,
8
        file_manager: M,
8
    ) {
8
        let context = Context {
8
            file_manager,
8
            vault: None,
8
            cache: None,
8
        };
8
        let temp_dir = crate::test_util::TestDirectory::new(format!("first-last-{}", label));
8
        std::fs::create_dir(&temp_dir).unwrap();
8
        let file_path = temp_dir.join("tree");
8

            
8
        let mut tree =
8
            TreeFile::<R, M::File>::write(file_path, State::default(), &context, None).unwrap();
8
        tree.set(None, ArcBytes::from(b"a"), ArcBytes::from(b"first"))
8
            .unwrap();
8
        tree.set(None, ArcBytes::from(b"z"), ArcBytes::from(b"last"))
8
            .unwrap();
8

            
8
        assert_eq!(tree.first_key(false).unwrap().unwrap(), b"a");
8
        let (key, value) = tree.first(false).unwrap().unwrap();
8
        assert_eq!(key, b"a");
8
        assert_eq!(value, b"first");

            
8
        assert_eq!(tree.last_key(false).unwrap().unwrap(), b"z");
8
        let (key, value) = tree.last(false).unwrap().unwrap();
8
        assert_eq!(key, b"z");
8
        assert_eq!(value, b"last");
8
    }

            
1
    #[test]
1
    fn std_first_last_versioned() {
1
        first_last::<Versioned, _>("versioned", StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn std_first_last_unversioned() {
1
        first_last::<Unversioned, _>("unversioned", StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn memory_first_last_versioned() {
1
        first_last::<Versioned, _>("versioned", MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn memory_first_last_unversioned() {
1
        first_last::<Unversioned, _>("unversioned", MemoryFileManager::default());
1
    }

            
1
    #[test]
1
    fn any_first_last_versioned() {
1
        first_last::<Versioned, _>("any-versioned", AnyFileManager::std());
1
        first_last::<Versioned, _>("any-versioned", AnyFileManager::memory());
1
    }

            
1
    #[test]
1
    fn any_first_last_unversioned() {
1
        first_last::<Unversioned, _>("any-unversioned", AnyFileManager::std());
1
        first_last::<Unversioned, _>("any-unversioned", AnyFileManager::memory());
1
    }

            
2
    fn bulk_compare_swaps<R: Root<Value = ArcBytes<'static>> + Default, M: FileManager>(
2
        label: &str,
2
        file_manager: M,
2
    ) {
2
        const BATCH: usize = 10_000;
2
        let context = Context {
2
            file_manager,
2
            vault: None,
2
            cache: None,
2
        };
2
        let temp_dir = crate::test_util::TestDirectory::new(format!("bulk-swap-{}", label));
2
        std::fs::create_dir(&temp_dir).unwrap();
2
        let file_path = temp_dir.join("tree");
2

            
2
        let mut tree =
2
            TreeFile::<R, M::File>::write(&file_path, State::default(), &context, None).unwrap();
2
        let mut rng = Pcg64::new_seed(1);
2

            
2
        let mut database_state = HashMap::new();
22
        for index in 1..=10 {
20
            println!("Batch {index}");
20
            // Generate a series of operations by randomly inserting or deleting
20
            // keys. Because the keyspace is u32, this first loop will mostly
20
            // append records.
20
            let mut batch = Vec::new();
20
            let mut operated_keys = HashSet::new();
200020
            while batch.len() < BATCH {
200000
                let key = ArcBytes::from(rng.generate::<u32>().to_be_bytes());
200000
                let key_state = database_state.entry(key.clone()).or_insert(false);
200000
                if operated_keys.insert(key.clone()) {
200000
                    batch.push((key, *key_state));
200000
                    *key_state = !*key_state;
200000
                }
            }

            
            // Half of the time, expire a significant number of keys, allowing
            // for our absorbtion rules to apply. We make sure not to consider
            // any keys that are already in the list above, as `modify()` can't
            // modify the same key twice.
20
            if rng.generate::<f32>() < 0.5 {
580006
                for (key, key_state) in &mut database_state {
579998
                    if *key_state
283626
                        && operated_keys.insert(key.clone())
203626
                        && rng.generate::<f32>() < 0.75
152726
                    {
152726
                        batch.push((key.clone(), true));
152726
                        *key_state = !*key_state;
427272
                    }
                }
12
            }

            
20
            let key_operations = batch
20
                .iter()
20
                .cloned()
20
                .collect::<BTreeMap<ArcBytes<'static>, bool>>();
20
            tree.modify(Modification {
20
                persistence_mode: PersistenceMode::Sync,
20
                keys: key_operations.keys().cloned().collect(),
20
                operation: Operation::CompareSwap(CompareSwap::new(
352726
                    &mut |key, _index, existing_value| {
352726
                        let should_remove = *key_operations.get(key).unwrap();
352726
                        if should_remove {
152726
                            assert!(
152726
                                existing_value.is_some(),
                                "key {key:?} had no existing value"
                            );
152726
                            KeyOperation::Remove
                        } else {
200000
                            assert!(existing_value.is_none(), "key {key:?} already had a value");
200000
                            KeyOperation::Set(key.to_owned())
                        }
352726
                    },
20
                )),
20
            })
20
            .unwrap();
        }
2
    }

            
1
    #[test]
1
    fn std_bulk_compare_swaps_unversioned() {
1
        bulk_compare_swaps::<Unversioned, _>("unversioned", StdFileManager::default());
1
    }

            
1
    #[test]
1
    fn std_bulk_compare_swaps_versioned() {
1
        bulk_compare_swaps::<Versioned, _>("versioned", StdFileManager::default());
1
    }
}