1
use std::{
2
    collections::HashMap,
3
    convert::Infallible,
4
    fmt::{Debug, Display},
5
    marker::PhantomData,
6
    ops::{Bound, RangeBounds},
7
};
8

            
9
use byteorder::{ReadBytesExt, WriteBytesExt};
10

            
11
use super::{
12
    interior::Interior,
13
    key_entry::KeyEntry,
14
    modify::{Modification, Operation},
15
    serialization::BinarySerialization,
16
    ChangeResult, KeyRange, PagedWriter,
17
};
18
use crate::{
19
    chunk_cache::CacheEntry,
20
    error::Error,
21
    io::File,
22
    tree::{key_entry::PositionIndex, read_chunk, versioned::Children, ScanEvaluation},
23
    vault::AnyVault,
24
    AbortError, ArcBytes, ChunkCache, ErrorKind,
25
};
26

            
27
/// A B-Tree entry that stores a list of key-`Index` pairs.
28
173385
#[derive(Clone, Debug)]
29
pub struct BTreeEntry<Index, ReducedIndex> {
30
    /// Whether the node contains unsaved changes.
31
    pub dirty: bool,
32
    /// The B-Tree node.
33
    pub node: BTreeNode<Index, ReducedIndex>,
34
}
35

            
36
/// A B-Tree entry that stores a list of key-`Index` pairs.
37
173385
#[derive(Clone, Debug)]
38
pub enum BTreeNode<Index, ReducedIndex> {
39
    /// An uninitialized node. Only used temporarily during modification.
40
    Uninitialized,
41
    /// An inline value. Overall, the B-Tree entry is a key-value pair.
42
    Leaf(Vec<KeyEntry<Index>>),
43
    /// An interior node that contains pointers to other nodes.
44
    Interior(Vec<Interior<Index, ReducedIndex>>),
45
}
46

            
47
impl<Index, ReducedIndex> From<BTreeNode<Index, ReducedIndex>> for BTreeEntry<Index, ReducedIndex> {
48
50469
    fn from(node: BTreeNode<Index, ReducedIndex>) -> Self {
49
50469
        Self { node, dirty: true }
50
50469
    }
51
}
52

            
53
impl<Index, ReducedIndex> Default for BTreeEntry<Index, ReducedIndex> {
54
19832
    fn default() -> Self {
55
19832
        Self::from(BTreeNode::Leaf(Vec::new()))
56
19832
    }
57
}
58

            
59
/// Reduces one or more `Index`es or instances of `Self` into a single `Self`
60
/// value.
61
///
62
/// This trait is used to collect statistics within the B-Tree. The `Index`
63
/// value is stored at the [`KeyEntry`] level, and each [`Interior`] entry
64
/// contains `Self`, which is calculated by calling [`Reducer::reduce()`] on all
65
/// stored `Index`es.
66
///
67
/// When an `Interior` node points to other interior nodes, it calculates the
68
/// stored value by calling [`Reducer::rereduce()`] with all the stored `Self`
69
/// values.
70
pub trait Reducer<Index, ReducedIndex = Index>: Debug + Clone + Send + Sync {
71
    /// Reduces one or more indexes into a single reduced index.
72
    fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> ReducedIndex
73
    where
74
        Index: 'a,
75
        Indexes: IntoIterator<Item = &'a Index, IntoIter = IndexesIter> + ExactSizeIterator,
76
        IndexesIter: Iterator<Item = &'a Index> + ExactSizeIterator + Clone;
77

            
78
    /// Reduces one or more previously-reduced indexes into a single reduced index.
79
    fn rereduce<'a, ReducedIndexes, ReducedIndexesIter>(
80
        &self,
81
        values: ReducedIndexes,
82
    ) -> ReducedIndex
83
    where
84
        Self: 'a,
85
        ReducedIndex: 'a,
86
        ReducedIndexes: IntoIterator<Item = &'a ReducedIndex, IntoIter = ReducedIndexesIter>
87
            + ExactSizeIterator,
88
        ReducedIndexesIter: Iterator<Item = &'a ReducedIndex> + ExactSizeIterator + Clone;
89
}
90

            
91
/// Creates an `Index` from a key and value.
92
pub trait Indexer<Value, Index>: Debug + Send + Sync {
93
    /// Index the key and value.
94
    fn index(&self, key: &ArcBytes<'_>, value: Option<&Value>) -> Index;
95
}
96

            
97
impl<Index> Reducer<Index, ()> for () {
98
282823
    fn reduce<'a, Indexes, IndexesIter>(&self, _indexes: Indexes) -> Self
99
282823
    where
100
282823
        Index: 'a,
101
282823
        Indexes: IntoIterator<Item = &'a Index, IntoIter = IndexesIter> + ExactSizeIterator,
102
282823
        IndexesIter: Iterator<Item = &'a Index> + ExactSizeIterator + Clone,
103
282823
    {
104
282823
    }
105

            
106
204829
    fn rereduce<'a, ReducedIndexes, ReducedIndexesIter>(&self, _values: ReducedIndexes) -> Self
107
204829
    where
108
204829
        Self: 'a,
109
204829
        ReducedIndexes:
110
204829
            IntoIterator<Item = &'a (), IntoIter = ReducedIndexesIter> + ExactSizeIterator,
111
204829
        ReducedIndexesIter: Iterator<Item = &'a ()> + ExactSizeIterator + Clone,
112
204829
    {
113
204829
    }
114
}
115

            
116
/// A context for a modification of a [`BTreeNode`].
117
pub struct ModificationContext<Value, Index, ReducedIndex, Indexer, Loader, IndexReducer>
118
where
119
    Indexer: FnMut(
120
        &ArcBytes<'_>,
121
        Option<&Value>,
122
        Option<&Index>,
123
        &mut PagedWriter<'_>,
124
    ) -> Result<KeyOperation<Index>, Error>,
125
    IndexReducer: Reducer<Index, ReducedIndex>,
126
    Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result<Option<Value>, Error>,
127
{
128
    /// The maximum number of children allowed per node.
129
    pub current_order: usize,
130
    /// The minimum number of children allowed per node.
131
    pub minimum_children: usize,
132
    /// The indexing function that produces `Index`es for a given `Value`.
133
    ///
134
    /// The parameters to this function are:
135
    ///
136
    /// - The entry's key
137
    /// - The entry's value, if the value is present.
138
    /// - The existing `Index`, if the value was previously present.
139
    /// - The writer used to read and write chunks to the file.
140
    pub indexer: Indexer,
141
    /// A function that is called to load a `Value` from an `Index`.
142
    ///
143
    /// The parameters to this function are:
144
    ///
145
    /// - The index to load a value for.
146
    /// - The paged writer that can be used to read chunks from the file.
147
    pub loader: Loader,
148
    /// A [`Reducer`] that reduces many `Index`es or many `ReducedIndex`es into
149
    /// a single `ReducedIndex` value.
150
    pub reducer: IndexReducer,
151
    _phantom: PhantomData<(Value, Index, ReducedIndex)>,
152
}
153

            
154
impl<Value, Index, ReducedIndex, Indexer, Loader, IndexReducer>
155
    ModificationContext<Value, Index, ReducedIndex, Indexer, Loader, IndexReducer>
156
where
157
    Indexer: FnMut(
158
        &ArcBytes<'_>,
159
        Option<&Value>,
160
        Option<&Index>,
161
        &mut PagedWriter<'_>,
162
    ) -> Result<KeyOperation<Index>, Error>,
163
    IndexReducer: Reducer<Index, ReducedIndex>,
164
    Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result<Option<Value>, Error>,
165
{
166
    /// Returns a new context.
167
64118
    pub fn new(
168
64118
        current_order: usize,
169
64118
        minimum_children: usize,
170
64118
        indexer: Indexer,
171
64118
        loader: Loader,
172
64118
        reducer: IndexReducer,
173
64118
    ) -> Self {
174
64118
        Self {
175
64118
            current_order,
176
64118
            minimum_children,
177
64118
            indexer,
178
64118
            loader,
179
64118
            reducer,
180
64118
            _phantom: PhantomData,
181
64118
        }
182
64118
    }
183
}
184

            
185
#[cfg(any(debug_assertions, feature = "paranoid"))]
186
macro_rules! assert_children_order {
187
    ($children:expr) => {
188
        assert_eq!(
189
            $children
190
                .windows(2)
191
                .find_map(|w| (w[0].key > w[1].key).then(|| (&w[0].key, &w[1].key))),
192
            None
193
        );
194
    };
195
}
196
#[cfg(not(any(debug_assertions, feature = "paranoid")))]
197
macro_rules! assert_children_order {
198
    ($children:expr) => {};
199
}
200

            
201
impl<Index, ReducedIndex> BTreeEntry<Index, ReducedIndex>
202
where
203
    Index: PositionIndex + Clone + BinarySerialization + Debug + 'static,
204
    ReducedIndex: Clone + BinarySerialization + Debug + 'static,
205
{
206
    /// Modifies this entry. This function may not apply all of `modification`,
207
    /// as the entry may enter a state in which the parent of this entry needs
208
    /// to act upon. For example, if this node grows too large, the modification
209
    /// operation will return [`ChangeResult::Split`].
210
    ///
211
    /// When each operation is performed, it is removed from `modification`. To
212
    /// ensure all modifications are executed, call this functino repeatedly
213
    /// until `modification` has no keys remaining.
214
951548
    pub fn modify<IndexedType, Indexer, Loader, IndexReducer>(
215
951548
        &mut self,
216
951548
        modification: &mut Modification<'_, IndexedType, Index>,
217
951548
        context: &mut ModificationContext<
218
951548
            IndexedType,
219
951548
            Index,
220
951548
            ReducedIndex,
221
951548
            Indexer,
222
951548
            Loader,
223
951548
            IndexReducer,
224
951548
        >,
225
951548
        max_key: Option<&ArcBytes<'_>>,
226
951548
        writer: &mut PagedWriter<'_>,
227
951548
    ) -> Result<ChangeResult, Error>
228
951548
    where
229
951548
        Indexer: FnMut(
230
951548
            &ArcBytes<'_>,
231
951548
            Option<&IndexedType>,
232
951548
            Option<&Index>,
233
951548
            &mut PagedWriter<'_>,
234
951548
        ) -> Result<KeyOperation<Index>, Error>,
235
951548
        IndexReducer: Reducer<Index, ReducedIndex>,
236
951548
        Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
237
951548
    {
238
951548
        match &mut self.node {
239
416110
            BTreeNode::Leaf(children) => {
240
416110
                if Self::modify_leaf(children, modification, context, max_key, writer)? {
241
416094
                    self.dirty = true;
242
416094
                    Ok(Self::clean_up_leaf(
243
416094
                        children,
244
416094
                        context.current_order,
245
416094
                        context.minimum_children,
246
416094
                    ))
247
                } else {
248
16
                    Ok(ChangeResult::Unchanged)
249
                }
250
            }
251
535438
            BTreeNode::Interior(children) => {
252
535438
                match Self::modify_interior(children, modification, context, max_key, writer)? {
253
                    ChangeResult::Changed => {
254
535438
                        self.dirty = true;
255
535438
                        Ok(Self::clean_up_interior(
256
535438
                            children,
257
535438
                            context.current_order,
258
535438
                            context.minimum_children,
259
535438
                        ))
260
                    }
261
                    other => Ok(other),
262
                }
263
            }
264
            BTreeNode::Uninitialized => unreachable!(),
265
        }
266
951548
    }
267

            
268
417696
    fn clean_up_leaf(
269
417696
        children: &mut [KeyEntry<Index>],
270
417696
        current_order: usize,
271
417696
        minimum_children: usize,
272
417696
    ) -> ChangeResult {
273
417696
        let child_count = children.len();
274
9696921
        assert_children_order!(children);
275

            
276
417696
        if child_count > current_order {
277
96158
            ChangeResult::Split
278
321538
        } else if child_count == 0 {
279
298
            ChangeResult::Remove
280
321240
        } else if child_count < minimum_children {
281
1602
            ChangeResult::Absorb
282
        } else {
283
319638
            ChangeResult::Changed
284
        }
285
417696
    }
286

            
287
535549
    fn clean_up_interior(
288
535549
        children: &mut [Interior<Index, ReducedIndex>],
289
535549
        current_order: usize,
290
535549
        minimum_children: usize,
291
535549
    ) -> ChangeResult {
292
535549
        let child_count = children.len();
293
15511912
        assert_children_order!(children);
294

            
295
535549
        if child_count > current_order {
296
44876
            ChangeResult::Split
297
490673
        } else if child_count == 0 {
298
100
            ChangeResult::Remove
299
490573
        } else if child_count < minimum_children {
300
20763
            ChangeResult::Absorb
301
        } else {
302
469810
            ChangeResult::Changed
303
        }
304
535549
    }
305

            
306
    #[allow(clippy::too_many_lines)] // TODO refactor, too many lines
307
416110
    fn modify_leaf<IndexedType, Indexer, Loader, IndexReducer>(
308
416110
        children: &mut Vec<KeyEntry<Index>>,
309
416110
        modification: &mut Modification<'_, IndexedType, Index>,
310
416110
        context: &mut ModificationContext<
311
416110
            IndexedType,
312
416110
            Index,
313
416110
            ReducedIndex,
314
416110
            Indexer,
315
416110
            Loader,
316
416110
            IndexReducer,
317
416110
        >,
318
416110
        max_key: Option<&ArcBytes<'_>>,
319
416110
        writer: &mut PagedWriter<'_>,
320
416110
    ) -> Result<bool, Error>
321
416110
    where
322
416110
        Indexer: FnMut(
323
416110
            &ArcBytes<'_>,
324
416110
            Option<&IndexedType>,
325
416110
            Option<&Index>,
326
416110
            &mut PagedWriter<'_>,
327
416110
        ) -> Result<KeyOperation<Index>, Error>,
328
416110
        IndexReducer: Reducer<Index, ReducedIndex>,
329
416110
        Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
330
416110
    {
331
416110
        let mut last_index = 0;
332
416110
        let mut any_changes = false;
333
416110
        let max_len = children.len().max(context.current_order);
334
1143196
        while !modification.keys.is_empty() && children.len() <= max_len {
335
1006421
            let key = modification.keys.last().unwrap();
336
1006421
            if max_key.map(|max_key| key > max_key).unwrap_or_default() {
337
279335
                break;
338
727086
            }
339
727086

            
340
2821320
            let search_result = children[last_index..].binary_search_by(|child| child.key.cmp(key));
341
727086
            match search_result {
342
167077
                Ok(matching_index) => {
343
167077
                    let key = modification.keys.pop().unwrap();
344
167077
                    last_index += matching_index;
345
167077
                    let index = match &mut modification.operation {
346
4100
                        Operation::Set(value) => (context.indexer)(
347
4100
                            &key,
348
4100
                            Some(value),
349
4100
                            Some(&children[last_index].index),
350
4100
                            writer,
351
4100
                        )?,
352
26
                        Operation::SetEach(values) => (context.indexer)(
353
26
                            &key,
354
26
                            Some(&values.pop().ok_or_else(|| {
355
                                ErrorKind::message("need the same number of keys as values")
356
26
                            })?),
357
26
                            Some(&children[last_index].index),
358
26
                            writer,
359
                        )?,
360
2000
                        Operation::Remove => (context.indexer)(
361
2000
                            &key,
362
2000
                            None,
363
2000
                            Some(&children[last_index].index),
364
2000
                            writer,
365
2000
                        )?,
366
160951
                        Operation::CompareSwap(callback) => {
367
160951
                            let current_index = &children[last_index].index;
368
160951
                            let existing_value = (context.loader)(current_index, writer)?;
369
160951
                            match callback(&key, Some(current_index), existing_value) {
370
8
                                KeyOperation::Skip => KeyOperation::Skip,
371
17
                                KeyOperation::Set(new_value) => (context.indexer)(
372
17
                                    &key,
373
17
                                    Some(&new_value),
374
17
                                    Some(current_index),
375
17
                                    writer,
376
17
                                )?,
377
                                KeyOperation::Remove => {
378
160926
                                    (context.indexer)(&key, None, Some(current_index), writer)?
379
                                }
380
                            }
381
                        }
382
                    };
383

            
384
167077
                    match index {
385
8
                        KeyOperation::Skip => {}
386
85606
                        KeyOperation::Set(index) => {
387
85606
                            children[last_index] = KeyEntry {
388
85606
                                key: key.into_owned(),
389
85606
                                index,
390
85606
                            };
391
85606
                            any_changes = true;
392
85606
                        }
393
81463
                        KeyOperation::Remove => {
394
81463
                            children.remove(last_index);
395
81463
                            any_changes = true;
396
81463
                        }
397
                    }
398
                }
399
560009
                Err(insert_at) => {
400
560009
                    last_index += insert_at;
401
560009
                    let key = modification.keys.pop().unwrap();
402
560009
                    let operation = match &mut modification.operation {
403
97706
                        Operation::Set(new_value) => {
404
97706
                            (context.indexer)(&key, Some(new_value), None, writer)?
405
                        }
406
262288
                        Operation::SetEach(new_values) => (context.indexer)(
407
262288
                            &key,
408
262288
                            Some(&new_values.pop().ok_or_else(|| {
409
                                ErrorKind::message("need the same number of keys as values")
410
262288
                            })?),
411
262288
                            None,
412
262288
                            writer,
413
                        )?,
414
                        Operation::Remove => {
415
                            // The key doesn't exist, so a remove is a no-op.
416
                            KeyOperation::Remove
417
                        }
418
200015
                        Operation::CompareSwap(callback) => match callback(&key, None, None) {
419
8
                            KeyOperation::Skip => KeyOperation::Skip,
420
200007
                            KeyOperation::Set(new_value) => {
421
200007
                                (context.indexer)(&key, Some(&new_value), None, writer)?
422
                            }
423
                            KeyOperation::Remove => (context.indexer)(&key, None, None, writer)?,
424
                        },
425
                    };
426
560009
                    match operation {
427
560001
                        KeyOperation::Set(index) => {
428
560001
                            // New node.
429
560001
                            if children.capacity() < children.len() + 1
430
32400
                                && context.current_order > children.len()
431
23456
                            {
432
23456
                                children.reserve(context.current_order + 1);
433
536545
                            }
434
560001
                            children.insert(
435
560001
                                last_index,
436
560001
                                KeyEntry {
437
560001
                                    key: key.into_owned(),
438
560001
                                    index,
439
560001
                                },
440
560001
                            );
441
560001
                            any_changes = true;
442
                        }
443
8
                        KeyOperation::Skip | KeyOperation::Remove => {}
444
                    }
445
                }
446
            }
447
18624235
            assert_children_order!(children);
448
        }
449
416110
        Ok(any_changes)
450
416110
    }
451

            
452
535438
    fn modify_interior<IndexedType, Indexer, Loader, IndexReducer>(
453
535438
        children: &mut Vec<Interior<Index, ReducedIndex>>,
454
535438
        modification: &mut Modification<'_, IndexedType, Index>,
455
535438
        context: &mut ModificationContext<
456
535438
            IndexedType,
457
535438
            Index,
458
535438
            ReducedIndex,
459
535438
            Indexer,
460
535438
            Loader,
461
535438
            IndexReducer,
462
535438
        >,
463
535438
        max_key: Option<&ArcBytes<'_>>,
464
535438
        writer: &mut PagedWriter<'_>,
465
535438
    ) -> Result<ChangeResult, Error>
466
535438
    where
467
535438
        Indexer: FnMut(
468
535438
            &ArcBytes<'_>,
469
535438
            Option<&IndexedType>,
470
535438
            Option<&Index>,
471
535438
            &mut PagedWriter<'_>,
472
535438
        ) -> Result<KeyOperation<Index>, Error>,
473
535438
        IndexReducer: Reducer<Index, ReducedIndex>,
474
535438
        Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
475
535438
    {
476
535438
        let mut last_index = 0;
477
535438
        let mut any_changes = false;
478
1422868
        while let Some(key) = modification.keys.last().cloned() {
479
1290759
            if last_index >= children.len()
480
1290759
                || max_key.map(|max_key| &key > max_key).unwrap_or_default()
481
            {
482
403329
                break;
483
887430
            }
484
887430
            let (containing_node_index, pushing_end) = children[last_index..]
485
3076327
                .binary_search_by(|child| child.key.cmp(&key))
486
887430
                .map_or_else(
487
887430
                    |not_found| {
488
885804
                        if not_found > 0 && not_found + last_index == children.len() {
489
                            // If we can't find a key less than what would fit
490
                            // within our children, this key will become the new key
491
                            // of the last child.
492
633069
                            (not_found - 1, true)
493
                        } else {
494
252735
                            (not_found, false)
495
                        }
496
887430
                    },
497
887430
                    |found| (found, false),
498
887430
                );
499
887430

            
500
887430
            last_index += containing_node_index;
501
887430
            let child = &mut children[last_index];
502
887430
            child.position.load(
503
887430
                writer.file,
504
887430
                false,
505
887430
                writer.vault,
506
887430
                writer.cache,
507
887430
                Some(context.current_order),
508
887430
            )?;
509
887430
            let child_entry = child.position.get_mut().unwrap();
510
887430
            let max_key_for_modification = if pushing_end {
511
                // The new key is being added to the end of the node.
512
633069
                key
513
254361
            } else if let Some(max_key) = max_key {
514
199948
                child.key.clone().min(max_key.clone())
515
            } else {
516
54413
                child.key.clone()
517
            };
518
887430
            let (change_result, should_backup) = Self::process_interior_change_result(
519
887430
                child_entry.modify(
520
887430
                    modification,
521
887430
                    context,
522
887430
                    Some(&max_key_for_modification),
523
887430
                    writer,
524
887430
                )?,
525
887430
                last_index,
526
887430
                children,
527
887430
                context,
528
887430
                writer,
529
            )?;
530
40321209
            assert_children_order!(children);
531
887430
            match change_result {
532
                ChangeResult::Unchanged => {}
533
                ChangeResult::Split => unreachable!(),
534
                ChangeResult::Absorb => {
535
                    return Ok(ChangeResult::Absorb);
536
                }
537
                ChangeResult::Remove => {
538
                    any_changes = true;
539
                }
540
887430
                ChangeResult::Changed => any_changes = true,
541
            }
542
887430
            if should_backup && last_index > 0 {
543
77636
                last_index -= 1;
544
809794
            }
545
        }
546
535438
        Ok(if any_changes {
547
535438
            ChangeResult::Changed
548
        } else {
549
            ChangeResult::Unchanged
550
        })
551
535438
    }
552

            
553
889143
    fn process_interior_change_result<IndexedType, Indexer, Loader, IndexReducer>(
554
889143
        result: ChangeResult,
555
889143
        child_index: usize,
556
889143
        children: &mut Vec<Interior<Index, ReducedIndex>>,
557
889143
        context: &ModificationContext<
558
889143
            IndexedType,
559
889143
            Index,
560
889143
            ReducedIndex,
561
889143
            Indexer,
562
889143
            Loader,
563
889143
            IndexReducer,
564
889143
        >,
565
889143
        writer: &mut PagedWriter<'_>,
566
889143
    ) -> Result<(ChangeResult, bool), Error>
567
889143
    where
568
889143
        Indexer: FnMut(
569
889143
            &ArcBytes<'_>,
570
889143
            Option<&IndexedType>,
571
889143
            Option<&Index>,
572
889143
            &mut PagedWriter<'_>,
573
889143
        ) -> Result<KeyOperation<Index>, Error>,
574
889143
        IndexReducer: Reducer<Index, ReducedIndex>,
575
889143
        Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
576
889143
    {
577
889143
        let can_absorb = children.len() > 1;
578
889143
        match (result, can_absorb) {
579
            (ChangeResult::Unchanged, _) => Ok((ChangeResult::Unchanged, false)),
580
            (ChangeResult::Changed, _) | (ChangeResult::Absorb, false) => {
581
751679
                let child = &mut children[child_index];
582
751679
                let child_entry = child.position.get_mut().unwrap();
583
751679
                child.key = child_entry.max_key().clone();
584
751679
                child.stats = child_entry.stats(&context.reducer);
585
751679
                Ok((ChangeResult::Changed, result == ChangeResult::Absorb))
586
            }
587
            (ChangeResult::Split, _) => {
588
135358
                Self::process_interior_split(child_index, children, context, writer)
589
            }
590
            (ChangeResult::Absorb, true) => {
591
1713
                Self::process_absorb(child_index, children, context, writer)
592
            }
593
            (ChangeResult::Remove, _) => {
594
393
                children.remove(child_index);
595
393

            
596
393
                Ok((ChangeResult::Changed, true))
597
            }
598
        }
599
889143
    }
600

            
601
1713
    fn process_absorb<IndexedType, Indexer, Loader, IndexReducer>(
602
1713
        child_index: usize,
603
1713
        children: &mut Vec<Interior<Index, ReducedIndex>>,
604
1713
        context: &ModificationContext<
605
1713
            IndexedType,
606
1713
            Index,
607
1713
            ReducedIndex,
608
1713
            Indexer,
609
1713
            Loader,
610
1713
            IndexReducer,
611
1713
        >,
612
1713
        writer: &mut PagedWriter<'_>,
613
1713
    ) -> Result<(ChangeResult, bool), Error>
614
1713
    where
615
1713
        Indexer: FnMut(
616
1713
            &ArcBytes<'_>,
617
1713
            Option<&IndexedType>,
618
1713
            Option<&Index>,
619
1713
            &mut PagedWriter<'_>,
620
1713
        ) -> Result<KeyOperation<Index>, Error>,
621
1713
        IndexReducer: Reducer<Index, ReducedIndex>,
622
1713
        Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
623
1713
    {
624
1713
        let (insert_on_top, sponge_index) = if child_index > 0 {
625
1670
            (true, child_index - 1)
626
        } else {
627
43
            (false, child_index)
628
        };
629

            
630
1713
        if sponge_index < children.len() - 1 {
631
1713
            let mut removed_child = children.remove(child_index);
632
1713
            let sponge = children.get_mut(sponge_index).unwrap();
633
1713

            
634
1713
            let removed_child = removed_child.position.get_mut().unwrap();
635
1713
            let leaves = match &mut removed_child.node {
636
1602
                BTreeNode::Leaf(leaves) => Children::Leaves(std::mem::take(leaves)),
637
111
                BTreeNode::Interior(interiors) => Children::Interiors(std::mem::take(interiors)),
638
                BTreeNode::Uninitialized => unreachable!(),
639
            };
640

            
641
1713
            sponge.position.load(
642
1713
                writer.file,
643
1713
                false,
644
1713
                writer.vault,
645
1713
                writer.cache,
646
1713
                Some(context.current_order),
647
1713
            )?;
648
1713
            let sponge_entry = sponge.position.get_mut().unwrap();
649
1713
            match Self::process_interior_change_result(
650
1713
                sponge_entry.absorb(
651
1713
                    leaves,
652
1713
                    insert_on_top,
653
1713
                    context.current_order,
654
1713
                    context.minimum_children,
655
1713
                    writer,
656
1713
                )?,
657
1713
                sponge_index,
658
1713
                children,
659
1713
                context,
660
1713
                writer,
661
            )? {
662
                (ChangeResult::Absorb | ChangeResult::Split | ChangeResult::Unchanged, _) => {
663
                    unreachable!()
664
                }
665
                (ChangeResult::Remove, _) => Ok((ChangeResult::Remove, true)),
666
1713
                (ChangeResult::Changed, _) => Ok((ChangeResult::Changed, true)),
667
            }
668
        } else {
669
            Ok((ChangeResult::Unchanged, false))
670
        }
671
1713
    }
672

            
673
135358
    fn process_interior_split<IndexedType, Indexer, Loader, IndexReducer>(
674
135358
        child_index: usize,
675
135358
        children: &mut Vec<Interior<Index, ReducedIndex>>,
676
135358
        context: &ModificationContext<
677
135358
            IndexedType,
678
135358
            Index,
679
135358
            ReducedIndex,
680
135358
            Indexer,
681
135358
            Loader,
682
135358
            IndexReducer,
683
135358
        >,
684
135358
        writer: &mut PagedWriter<'_>,
685
135358
    ) -> Result<(ChangeResult, bool), Error>
686
135358
    where
687
135358
        Indexer: FnMut(
688
135358
            &ArcBytes<'_>,
689
135358
            Option<&IndexedType>,
690
135358
            Option<&Index>,
691
135358
            &mut PagedWriter<'_>,
692
135358
        ) -> Result<KeyOperation<Index>, Error>,
693
135358
        IndexReducer: Reducer<Index, ReducedIndex>,
694
135358
        Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
695
135358
    {
696
135358
        let mut should_backup = false;
697
135358
        // Before adding a new node, we want to first try to use neighboring
698
135358
        // nodes to absorb enough such that a split is no longer needed. With
699
135358
        // the dynamic ordering used, we will likely end up with nodes that have
700
135358
        // less than the minimum number of nodes, if we were to clean them up.
701
135358
        // This stealing mechanism helps bring node utilization up without
702
135358
        // requiring much additional IO. Ultimately, the best time to optimize
703
135358
        // the tree will be during a compaction phase.
704
135358
        if child_index > 0 {
705
114557
            match Self::steal_children_from_start(child_index, children, context, writer)? {
706
39101
                (ChangeResult::Unchanged, steal_results_in_should_backup) => {
707
39101
                    should_backup = steal_results_in_should_backup;
708
39101
                }
709
75456
                (ChangeResult::Changed, should_backup) => {
710
75456
                    return Ok((ChangeResult::Changed, should_backup))
711
                }
712
                _ => unreachable!(),
713
            }
714
20801
        }
715

            
716
59902
        if child_index + 1 < children.len() {
717
46587
            match Self::steal_children_from_end(child_index, children, context, writer)? {
718
11046
                ChangeResult::Unchanged => {}
719
35541
                ChangeResult::Changed => return Ok((ChangeResult::Changed, should_backup)),
720
                _ => unreachable!(),
721
            }
722
13315
        }
723

            
724
24361
        let child = children[child_index].position.get_mut().unwrap();
725
24361
        child.dirty = true;
726
24361
        let next_node = match &mut child.node {
727
23233
            BTreeNode::Leaf(children) => {
728
23233
                let upper = children
729
23233
                    .splice((children.len() + 1) / 2.., std::iter::empty())
730
23233
                    .collect::<Vec<_>>();
731
23233
                Self::from(BTreeNode::Leaf(upper))
732
            }
733
1128
            BTreeNode::Interior(children) => {
734
1128
                let upper = children
735
1128
                    .splice((children.len() + 1) / 2.., std::iter::empty())
736
1128
                    .collect::<Vec<_>>();
737
1128
                Self::from(BTreeNode::Interior(upper))
738
            }
739
            BTreeNode::Uninitialized => unimplemented!(),
740
        };
741
24361
        let (max_key, stats) = { (child.max_key().clone(), child.stats(&context.reducer)) };
742
24361
        children[child_index].key = max_key;
743
24361
        children[child_index].stats = stats;
744
24361

            
745
24361
        children.insert(child_index + 1, Interior::new(next_node, &context.reducer));
746
24361

            
747
24361
        Ok((ChangeResult::Changed, should_backup))
748
135358
    }
749

            
750
114557
    fn steal_children_from_start<IndexedType, Indexer, Loader, IndexReducer>(
751
114557
        child_index: usize,
752
114557
        children: &mut [Interior<Index, ReducedIndex>],
753
114557
        context: &ModificationContext<
754
114557
            IndexedType,
755
114557
            Index,
756
114557
            ReducedIndex,
757
114557
            Indexer,
758
114557
            Loader,
759
114557
            IndexReducer,
760
114557
        >,
761
114557
        writer: &mut PagedWriter<'_>,
762
114557
    ) -> Result<(ChangeResult, bool), Error>
763
114557
    where
764
114557
        Indexer: FnMut(
765
114557
            &ArcBytes<'_>,
766
114557
            Option<&IndexedType>,
767
114557
            Option<&Index>,
768
114557
            &mut PagedWriter<'_>,
769
114557
        ) -> Result<KeyOperation<Index>, Error>,
770
114557
        IndexReducer: Reducer<Index, ReducedIndex>,
771
114557
        Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
772
114557
    {
773
114557
        let mut should_backup = false;
774
114557
        // Check the previous child to see if it can accept any of this child.
775
114557
        children[child_index - 1].position.load(
776
114557
            writer.file,
777
114557
            false,
778
114557
            writer.vault,
779
114557
            writer.cache,
780
114557
            Some(context.current_order),
781
114557
        )?;
782
114557
        let previous_child_count = children[child_index - 1].position.get().unwrap().count();
783
114557
        if let Some(free_space) = context.current_order.checked_sub(previous_child_count) {
784
110598
            if free_space > 0 {
785
76195
                should_backup = true;
786
                // First, take the children from the node that reported it needed to be split.
787
76195
                let stolen_children =
788
76195
                    match &mut children[child_index].position.get_mut().unwrap().node {
789
53015
                        BTreeNode::Leaf(children) => {
790
53015
                            let eligible_amount =
791
53015
                                children.len().saturating_sub(context.minimum_children);
792
53015
                            let amount_to_steal = free_space.min(eligible_amount);
793
53015
                            Children::Leaves(
794
53015
                                children
795
53015
                                    .splice(0..amount_to_steal, std::iter::empty())
796
53015
                                    .collect(),
797
53015
                            )
798
                        }
799
23180
                        BTreeNode::Interior(children) => {
800
23180
                            let eligible_amount =
801
23180
                                children.len().saturating_sub(context.minimum_children);
802
23180
                            let amount_to_steal = free_space.max(eligible_amount);
803
23180
                            Children::Interiors(
804
23180
                                children
805
23180
                                    .splice(0..amount_to_steal, std::iter::empty())
806
23180
                                    .collect(),
807
23180
                            )
808
                        }
809
                        BTreeNode::Uninitialized => unreachable!(),
810
                    };
811
                // Extend the previous node with the new values
812
76195
                let previous_child = children[child_index - 1].position.get_mut().unwrap();
813
76195
                match (&mut previous_child.node, stolen_children) {
814
53015
                    (BTreeNode::Leaf(children), Children::Leaves(new_entries)) => {
815
53015
                        children.extend(new_entries);
816
53015
                    }
817
23180
                    (BTreeNode::Interior(children), Children::Interiors(new_entries)) => {
818
23180
                        children.extend(new_entries);
819
23180
                    }
820
                    _ => unreachable!(),
821
                }
822
                // Update the statistics for the previous child.
823
76195
                previous_child.dirty = true;
824
76195
                let (max_key, stats) = {
825
76195
                    (
826
76195
                        previous_child.max_key().clone(),
827
76195
                        previous_child.stats(&context.reducer),
828
76195
                    )
829
76195
                };
830
76195
                children[child_index - 1].key = max_key;
831
76195
                children[child_index - 1].stats = stats;
832
76195

            
833
76195
                // Update the current child.
834
76195
                let child = children[child_index].position.get_mut().unwrap();
835
76195
                child.dirty = true;
836
76195
                let (max_key, stats) = { (child.max_key().clone(), child.stats(&context.reducer)) };
837
76195
                children[child_index].key = max_key;
838
76195
                children[child_index].stats = stats;
839
76195
                // If we couldn't balance enough, the caller will still need to
840
76195
                // balance further.
841
76195
                if child.count() <= context.current_order {
842
75456
                    return Ok((ChangeResult::Changed, should_backup));
843
739
                }
844
34403
            }
845
3959
        }
846

            
847
39101
        Ok((ChangeResult::Unchanged, should_backup))
848
114557
    }
849

            
850
46587
    fn steal_children_from_end<IndexedType, Indexer, Loader, IndexReducer>(
851
46587
        child_index: usize,
852
46587
        children: &mut [Interior<Index, ReducedIndex>],
853
46587
        context: &ModificationContext<
854
46587
            IndexedType,
855
46587
            Index,
856
46587
            ReducedIndex,
857
46587
            Indexer,
858
46587
            Loader,
859
46587
            IndexReducer,
860
46587
        >,
861
46587
        writer: &mut PagedWriter<'_>,
862
46587
    ) -> Result<ChangeResult, Error>
863
46587
    where
864
46587
        Indexer: FnMut(
865
46587
            &ArcBytes<'_>,
866
46587
            Option<&IndexedType>,
867
46587
            Option<&Index>,
868
46587
            &mut PagedWriter<'_>,
869
46587
        ) -> Result<KeyOperation<Index>, Error>,
870
46587
        IndexReducer: Reducer<Index, ReducedIndex>,
871
46587
        Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
872
46587
    {
873
46587
        // Check the previous child to see if it can accept any of this child.
874
46587
        children[child_index + 1].position.load(
875
46587
            writer.file,
876
46587
            false,
877
46587
            writer.vault,
878
46587
            writer.cache,
879
46587
            Some(context.current_order),
880
46587
        )?;
881
46587
        let next_child_count = children[child_index + 1].position.get().unwrap().count();
882
46587
        if let Some(free_space) = context.current_order.checked_sub(next_child_count) {
883
44890
            if free_space > 0 {
884
                // First, take the children from the node that reported it needed to be split.
885
35752
                let stolen_children =
886
35752
                    match &mut children[child_index].position.get_mut().unwrap().node {
887
15442
                        BTreeNode::Leaf(children) => {
888
15442
                            let eligible_amount =
889
15442
                                children.len().saturating_sub(context.minimum_children);
890
15442
                            let amount_to_steal = free_space.min(eligible_amount);
891
15442
                            Children::Leaves(
892
15442
                                children
893
15442
                                    .splice(children.len() - amount_to_steal.., std::iter::empty())
894
15442
                                    .collect(),
895
15442
                            )
896
                        }
897
20310
                        BTreeNode::Interior(children) => {
898
20310
                            let eligible_amount =
899
20310
                                children.len().saturating_sub(context.minimum_children);
900
20310
                            let amount_to_steal = free_space.max(eligible_amount);
901
20310
                            Children::Interiors(
902
20310
                                children
903
20310
                                    .splice(children.len() - amount_to_steal.., std::iter::empty())
904
20310
                                    .collect(),
905
20310
                            )
906
                        }
907
                        BTreeNode::Uninitialized => unreachable!(),
908
                    };
909
                // Extend the previous node with the new values
910
35752
                let next_child = children[child_index + 1].position.get_mut().unwrap();
911
35752
                match (&mut next_child.node, stolen_children) {
912
15442
                    (BTreeNode::Leaf(children), Children::Leaves(new_entries)) => {
913
15442
                        children.splice(0..0, new_entries);
914
15442
                    }
915
20310
                    (BTreeNode::Interior(children), Children::Interiors(new_entries)) => {
916
20310
                        children.splice(0..0, new_entries);
917
20310
                    }
918
                    _ => unreachable!(),
919
                }
920
                // Update the statistics for the previous child.
921
35752
                next_child.dirty = true;
922
35752
                let (max_key, stats) = {
923
35752
                    (
924
35752
                        next_child.max_key().clone(),
925
35752
                        next_child.stats(&context.reducer),
926
35752
                    )
927
35752
                };
928
35752
                children[child_index + 1].key = max_key;
929
35752
                children[child_index + 1].stats = stats;
930
35752

            
931
35752
                // Update the current child.
932
35752
                let child = children[child_index].position.get_mut().unwrap();
933
35752
                child.dirty = true;
934
35752
                let (max_key, stats) = { (child.max_key().clone(), child.stats(&context.reducer)) };
935
35752
                children[child_index].key = max_key;
936
35752
                children[child_index].stats = stats;
937
35752
                // If we couldn't balance enough, the caller will still need to
938
35752
                // balance further.
939
35752
                if child.count() <= context.current_order {
940
35541
                    return Ok(ChangeResult::Changed);
941
211
                }
942
9138
            }
943
1697
        }
944

            
945
11046
        Ok(ChangeResult::Unchanged)
946
46587
    }
947

            
948
273091
    fn count(&self) -> usize {
949
273091
        match &self.node {
950
            BTreeNode::Uninitialized => unreachable!(),
951
182006
            BTreeNode::Leaf(children) => children.len(),
952
91085
            BTreeNode::Interior(children) => children.len(),
953
        }
954
273091
    }
955

            
956
1713
    fn absorb(
957
1713
        &mut self,
958
1713
        children: Children<Index, ReducedIndex>,
959
1713
        insert_at_top: bool,
960
1713
        current_order: usize,
961
1713
        minimum_children: usize,
962
1713
        writer: &mut PagedWriter<'_>,
963
1713
    ) -> Result<ChangeResult, Error> {
964
1713
        self.dirty = true;
965
1713
        match (&mut self.node, children) {
966
1602
            (BTreeNode::Leaf(existing_children), Children::Leaves(leaves)) => {
967
1602
                if insert_at_top {
968
1567
                    existing_children.extend(leaves);
969
1567
                } else {
970
35
                    existing_children.splice(0..0, leaves);
971
35
                }
972

            
973
1602
                Ok(Self::clean_up_leaf(
974
1602
                    existing_children,
975
1602
                    current_order,
976
1602
                    minimum_children,
977
1602
                ))
978
            }
979
            (BTreeNode::Interior(existing_children), Children::Leaves(leaves)) => {
980
                // Pass the leaves along to the first or last child.
981
                let sponge = if insert_at_top {
982
                    existing_children.last_mut().unwrap()
983
                } else {
984
                    existing_children.first_mut().unwrap()
985
                };
986
                sponge.position.load(
987
                    writer.file,
988
                    false,
989
                    writer.vault,
990
                    writer.cache,
991
                    Some(current_order),
992
                )?;
993
                let sponge = sponge.position.get_mut().unwrap();
994
                sponge.absorb(
995
                    Children::Leaves(leaves),
996
                    insert_at_top,
997
                    current_order,
998
                    minimum_children,
999
                    writer,
                )
            }
111
            (BTreeNode::Interior(existing_children), Children::Interiors(interiors)) => {
111
                if insert_at_top {
103
                    existing_children.extend(interiors);
103
                } else {
8
                    existing_children.splice(0..0, interiors);
8
                }

            
111
                Ok(Self::clean_up_interior(
111
                    existing_children,
111
                    current_order,
111
                    minimum_children,
111
                ))
            }
            (BTreeNode::Leaf(_), Children::Interiors(_)) | (BTreeNode::Uninitialized, _) => {
                unreachable!()
            }
        }
1713
    }

            
2092
    pub(crate) fn split<R>(
2092
        &mut self,
2092
        reducer: &R,
2092
    ) -> (Interior<Index, ReducedIndex>, Interior<Index, ReducedIndex>)
2092
    where
2092
        R: Reducer<Index, ReducedIndex>,
2092
    {
2092
        let mut old_node = Self::from(BTreeNode::Uninitialized);
2092
        std::mem::swap(self, &mut old_node);
2092
        let (lower, upper) = match old_node.node {
1834
            BTreeNode::Leaf(mut children) => {
1834
                let upper = children
1834
                    .splice((children.len() + 1) / 2.., std::iter::empty())
1834
                    .collect::<Vec<_>>();
1834
                let lower = Self::from(BTreeNode::Leaf(children));
1834
                let upper = Self::from(BTreeNode::Leaf(upper));
1834

            
1834
                (lower, upper)
            }
258
            BTreeNode::Interior(mut children) => {
258
                let upper = children
258
                    .splice((children.len() + 1) / 2.., std::iter::empty())
258
                    .collect::<Vec<_>>();
258
                let lower = Self::from(BTreeNode::Interior(children));
258
                let upper = Self::from(BTreeNode::Interior(upper));
258

            
258
                (lower, upper)
            }
            BTreeNode::Uninitialized => unreachable!(),
        };

            
2092
        (Interior::new(lower, reducer), Interior::new(upper, reducer))
2092
    }

            
    /// Replaces this node with a [`BTreeNode::Interior`] containing two
    /// children nodes that were created by splitting the children of this node.
    ///
    /// This function should only be called on a root node.
2092
    pub fn split_root<R>(&mut self, reducer: &R)
2092
    where
2092
        R: Reducer<Index, ReducedIndex>,
2092
    {
2092
        let (lower, upper) = self.split(reducer);
2092
        self.node = BTreeNode::Interior(vec![lower, upper]);
2092
    }

            
    /// Returns the collected statistics for this node.
    #[must_use]
1096096
    pub fn stats<R: Reducer<Index, ReducedIndex>>(&self, reducer: &R) -> ReducedIndex {
1096096
        match &self.node {
4412721
            BTreeNode::Leaf(children) => reducer.reduce(children.iter().map(|c| &c.index)),
24568825
            BTreeNode::Interior(children) => reducer.rereduce(children.iter().map(|c| &c.stats)),
            BTreeNode::Uninitialized => unreachable!(),
        }
1096096
    }

            
    /// Returns the highest-ordered key contained in this node.
    #[must_use]
    #[allow(clippy::missing_panics_doc)]
1028479
    pub fn max_key(&self) -> &ArcBytes<'static> {
1028479
        match &self.node {
506422
            BTreeNode::Leaf(children) => &children.last().unwrap().key,
522057
            BTreeNode::Interior(children) => &children.last().unwrap().key,
            BTreeNode::Uninitialized => unreachable!(),
        }
1028479
    }

            
    /// Scans this entry looking for keys in `range`.
    ///
    /// For each interior node that may have keys in `range`,
    /// [`ScanArgs::node_evaluator`] is invoked. This allows fine-grained
    /// control over skipping entire branches of a tree or aborting a scan
    /// early. If [`ScanEvaluation::ReadData`] is returned, the interior node's
    /// children will be scanned.
    ///
    /// Once a leaf node is encountered, each key is checked against `range` and
    /// [`ScanArgs::key_evaluator`] is invoked. If [`ScanEvaluation::ReadData`]
    /// is returned, the stored value for the entry will be loaded and
    /// [`ScanArgs::data_callback`] will be invoked.
    #[cfg_attr(
        feature = "tracing",
1964792
        tracing::instrument(skip(self, args, file, vault, cache))
    )]
    pub fn scan<
        'k,
        'keys,
        CallerError: Display + Debug,
        KeyRangeBounds,
        NodeEvaluator,
        KeyEvaluator,
        ScanDataCallback,
    >(
        &self,
        range: &'keys KeyRangeBounds,
        args: &mut ScanArgs<
            ArcBytes<'static>,
            Index,
            ReducedIndex,
            CallerError,
            NodeEvaluator,
            KeyEvaluator,
            ScanDataCallback,
        >,
        file: &mut dyn File,
        vault: Option<&dyn AnyVault>,
        cache: Option<&ChunkCache>,
        current_depth: usize,
    ) -> Result<bool, AbortError<CallerError>>
    where
        NodeEvaluator: FnMut(&ArcBytes<'static>, &ReducedIndex, usize) -> ScanEvaluation,
        KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation,
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
        ScanDataCallback: FnMut(
            ArcBytes<'static>,
            &Index,
            ArcBytes<'static>,
        ) -> Result<(), AbortError<CallerError>>,
    {
        match &self.node {
            BTreeNode::Leaf(children) => {
                for child in DirectionalSliceIterator::new(args.forwards, children) {
                    if range.contains(&child.key.as_slice()) {
                        match (args.key_evaluator)(&child.key, &child.index) {
                            ScanEvaluation::ReadData => {
                                if child.index.position() > 0 {
                                    let data = match read_chunk(
                                        child.index.position(),
                                        false,
                                        file,
                                        vault,
                                        cache,
                                    )? {
                                        CacheEntry::ArcBytes(contents) => contents,
                                        CacheEntry::Decoded(_) => unreachable!(),
                                    };
                                    (args.data_callback)(child.key.clone(), &child.index, data)?;
                                }
                            }
                            ScanEvaluation::Skip => {}
                            ScanEvaluation::Stop => return Ok(false),
                        };
                    }
                }
            }
            BTreeNode::Interior(children) => {
                for (index, child) in
                    DirectionalSliceIterator::new(args.forwards, children).enumerate()
                {
                    // The keys in this child range from the previous child's key (exclusive) to the entry's key (inclusive).
                    let start_bound = range.start_bound();
                    let end_bound = range.end_bound();
                    if args.forwards {
                        if index > 0 {
                            let previous_entry = &children[index - 1];

            
                            // One the previous entry's key is less than the end
                            // bound, we can break out of the loop.
                            match end_bound {
                                Bound::Included(key) => {
                                    if previous_entry.key > **key {
                                        break;
                                    }
                                }
                                Bound::Excluded(key) => {
                                    if &previous_entry.key >= key {
                                        break;
                                    }
                                }
                                Bound::Unbounded => {}
                            }
                        }
                    } else {
                        // TODO need to write the logic for breaking out when iterating backwards.
                    }

            
                    // Keys in this child could match as long as the start bound
                    // is less than the key for this child.
                    match start_bound {
                        Bound::Included(key) => {
                            if child.key < **key {
                                continue;
                            }
                        }
                        Bound::Excluded(key) => {
                            if &child.key <= key {
                                continue;
                            }
                        }
                        Bound::Unbounded => {}
                    }

            
                    let keep_scanning =
                        match (args.node_evaluator)(&child.key, &child.stats, current_depth) {
                            ScanEvaluation::Stop => false,
                            ScanEvaluation::ReadData => child.position.map_loaded_entry(
                                file,
                                vault,
                                cache,
                                Some(children.len()),
949714
                                |entry, file| {
949714
                                    entry.scan(range, args, file, vault, cache, current_depth + 1)
949714
                                },
                            )?,
                            ScanEvaluation::Skip => true,
                        };
                    if !keep_scanning {
                        return Ok(false);
                    }
                }
            }
            BTreeNode::Uninitialized => unreachable!(),
        }
        Ok(true)
    }

            
    /// Read the values stored for one or more keys.
    ///
    /// `key_evaluator` is invoked once for each key found. If
    /// [`ScanEvaluation::ReadData`] is returned, the key's value will be loaded
    /// and `key_reader` will be invoked with the result.
    ///
    /// The `key_reader` function is not invoked immediately in an effort to
    /// optimize the order of reads from the disk.
58344
    pub fn get_multiple<KeyEvaluator, KeyReader, Keys, Bytes>(
58344
        &self,
58344
        keys: &mut Keys,
58344
        mut key_evaluator: KeyEvaluator,
58344
        mut key_reader: KeyReader,
58344
        file: &mut dyn File,
58344
        vault: Option<&dyn AnyVault>,
58344
        cache: Option<&ChunkCache>,
58344
    ) -> Result<(), Error>
58344
    where
58344
        KeyEvaluator: for<'k> FnMut(&'k ArcBytes<'static>, &'k Index) -> ScanEvaluation,
58344
        KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Index) -> Result<(), Error>,
58344
        Keys: Iterator<Item = Bytes>,
58344
        Bytes: AsRef<[u8]>,
58344
    {
58344
        let mut positions_to_read = Vec::new();
58344
        self.get(
58344
            &mut KeyRange::new(keys),
58344
            &mut |key, index| key_evaluator(key, index),
58344
            &mut |key, index| {
56419
                // Deleted keys are stored with a 0 position.
56419
                if index.position() > 0 {
54415
                    positions_to_read.push((key, index.clone()));
54415
                }
56419
                Ok(())
58344
            },
58344
            file,
58344
            vault,
58344
            cache,
58344
        )?;

            
        // Sort by position on disk
58344
        positions_to_read.sort_by(|a, b| a.1.position().cmp(&b.1.position()));

            
112759
        for (key, index) in positions_to_read {
54415
            if index.position() > 0 {
54415
                match read_chunk(index.position(), false, file, vault, cache)? {
54415
                    CacheEntry::ArcBytes(contents) => {
54415
                        key_reader(key, contents, index)?;
                    }
                    CacheEntry::Decoded(_) => unreachable!(),
                };
            } else {
                key_reader(key, ArcBytes::default(), index)?;
            }
        }
58344
        Ok(())
58344
    }

            
    /// Read the values stored within a range of keys.
    ///
    /// `key_evaluator` is invoked once for each key found. If
    /// [`ScanEvaluation::ReadData`] is returned, the key's value will be loaded
    /// and `key_reader` will be invoked with the result.
    ///
    /// The `key_reader` function is not invoked immediately in an effort to
    /// optimize the order of reads from the disk.
    #[cfg_attr(
        feature = "tracing",
395468
        tracing::instrument(skip(self, key_evaluator, keys, key_reader, file, vault, cache))
    )]
    fn get<KeyEvaluator, KeyReader, Keys, Bytes>(
        &self,
        keys: &mut KeyRange<Keys, Bytes>,
        key_evaluator: &mut KeyEvaluator,
        key_reader: &mut KeyReader,
        file: &mut dyn File,
        vault: Option<&dyn AnyVault>,
        cache: Option<&ChunkCache>,
    ) -> Result<bool, Error>
    where
        KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation,
        KeyReader: FnMut(ArcBytes<'static>, &Index) -> Result<(), AbortError<Infallible>>,
        Keys: Iterator<Item = Bytes>,
        Bytes: AsRef<[u8]>,
    {
        match &self.node {
            BTreeNode::Leaf(children) => {
                let mut last_index = 0;
                let mut took_one_key = false;
                while let Some(key) = keys.current_key() {
160480
                    match children[last_index..].binary_search_by(|child| (*child.key).cmp(key)) {
                        Ok(matching) => {
                            took_one_key = true;
                            keys.next();
                            last_index += matching;
                            let entry = &children[last_index];
                            match key_evaluator(&entry.key, &entry.index) {
                                ScanEvaluation::ReadData => {
                                    key_reader(entry.key.clone(), &entry.index)?;
                                }
                                ScanEvaluation::Skip => {}
                                ScanEvaluation::Stop => return Ok(false),
                            }
                        }
                        Err(location) => {
                            last_index += location;
                            if last_index == children.len() && took_one_key {
                                // No longer matching within this tree
                                break;
                            }

            
                            // Key not found.
                            took_one_key = true;
                            keys.next();
                        }
                    }
                }
            }
            BTreeNode::Interior(children) => {
                let mut last_index = 0;
                while let Some(key) = keys.current_key() {
                    let containing_node_index = children[last_index..]
554918
                        .binary_search_by(|child| (*child.key).cmp(key))
136075
                        .unwrap_or_else(|not_found| not_found);
                    last_index += containing_node_index;

            
                    // This isn't guaranteed to succeed because we add one. If
                    // the key being searched for isn't contained, it will be
                    // greater than any of the node's keys.
                    if let Some(child) = children.get(last_index) {
                        let keep_scanning = child.position.map_loaded_entry(
                            file,
                            vault,
                            cache,
                            Some(children.len()),
139390
                            |entry, file| {
139390
                                entry
139390
                                    .get(keys, key_evaluator, key_reader, file, vault, cache)
139390
                                    .map_err(AbortError::Nebari)
139390
                            },
                        )?;
                        if !keep_scanning {
                            break;
                        }
                        // The leaf will consume all the keys that can match.
                        // Thus, we can skip it on the next iteration.
                        last_index += 1;
                    } else {
                        break;
                    }
                }
            }
            BTreeNode::Uninitialized => unreachable!(),
        }
        Ok(true)
    }

            
    /// Recursively copy all stored data from `file` to `writer`.
    ///
    /// Returns true if any data was copied.
    #[allow(clippy::too_many_arguments)]
657660
    pub fn copy_data_to<Callback>(
657660
        &mut self,
657660
        include_nodes: NodeInclusion,
657660
        file: &mut dyn File,
657660
        copied_chunks: &mut HashMap<u64, u64>,
657660
        writer: &mut PagedWriter<'_>,
657660
        vault: Option<&dyn AnyVault>,
657660
        scratch: &mut Vec<u8>,
657660
        index_callback: &mut Callback,
657660
    ) -> Result<bool, Error>
657660
    where
657660
        Callback: FnMut(
657660
            &ArcBytes<'static>,
657660
            &mut Index,
657660
            &mut dyn File,
657660
            &mut HashMap<u64, u64>,
657660
            &mut PagedWriter<'_>,
657660
            Option<&dyn AnyVault>,
657660
        ) -> Result<bool, Error>,
657660
    {
657660
        let mut any_changes = false;
657660
        match &mut self.node {
606668
            BTreeNode::Leaf(children) => {
4199892
                for child in children {
                    any_changes =
3593224
                        child.copy_data_to(file, copied_chunks, writer, vault, index_callback)?
558104
                            || any_changes;
                }
            }
50992
            BTreeNode::Interior(children) => {
701640
                for child in children {
650648
                    any_changes = child.copy_data_to(
650648
                        include_nodes.next(),
650648
                        file,
650648
                        copied_chunks,
650648
                        writer,
650648
                        vault,
650648
                        scratch,
650648
                        index_callback,
650648
                    )? || any_changes;
                }
            }
            BTreeNode::Uninitialized => unreachable!(),
        }
        // Regardless of if data was copied, data locations have been updated,
        // so the node is considered dirty.
657660
        self.dirty |= true;
657660
        Ok(any_changes)
657660
    }
}

            
/// Determines whether a data copy should include nodes or just data.
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
pub enum NodeInclusion {
    /// Excludes nodes from this copy.
    Exclude,
    /// Excludes the current node, but includes all subsequent nodes.
    IncludeNext,
    /// Includes all nodes.
    Include,
}

            
impl NodeInclusion {
650648
    pub(crate) const fn next(self) -> Self {
650648
        match self {
325324
            Self::Exclude => Self::Exclude,
325324
            Self::IncludeNext | Self::Include => Self::Include,
        }
650648
    }

            
650648
    pub(crate) const fn should_include(self) -> bool {
650648
        matches!(self, Self::Include)
650648
    }
}

            
impl<
        Index: Clone + BinarySerialization + Debug + 'static,
        ReducedIndex: Clone + BinarySerialization + Debug + 'static,
    > BinarySerialization for BTreeEntry<Index, ReducedIndex>
{
738459
    fn serialize_to(
738459
        &mut self,
738459
        writer: &mut Vec<u8>,
738459
        paged_writer: &mut PagedWriter<'_>,
738459
    ) -> Result<usize, Error> {
738459
        self.dirty = false;
738459
        let mut bytes_written = 0;
738459
        // The next byte determines the node type.
738459
        match &mut self.node {
513674
            BTreeNode::Leaf(leafs) => {
4090323
                assert_children_order!(leafs);
513674
                writer.write_u8(1)?;
513674
                bytes_written += 1;
5117654
                for leaf in leafs {
4603980
                    bytes_written += leaf.serialize_to(writer, paged_writer)?;
                }
            }
224785
            BTreeNode::Interior(interiors) => {
4139817
                assert_children_order!(interiors);
224785
                writer.write_u8(0)?;
224785
                bytes_written += 1;
4589387
                for interior in interiors {
4364602
                    bytes_written += interior.serialize_to(writer, paged_writer)?;
                }
            }
            BTreeNode::Uninitialized => unreachable!(),
        }

            
738459
        Ok(bytes_written)
738459
    }

            
1938147
    fn deserialize_from(
1938147
        reader: &mut ArcBytes<'_>,
1938147
        current_order: Option<usize>,
1938147
    ) -> Result<Self, Error> {
1938147
        let node_header = reader.read_u8()?;
1938147
        match node_header {
            0 => {
                // Interior
374262
                let mut nodes = Vec::new();
374262
                if let Some(current_order) = current_order {
314272
                    nodes.reserve(current_order + 1);
314272
                }
5975408
                while !reader.is_empty() {
5601146
                    nodes.push(Interior::deserialize_from(reader, current_order)?);
                }
374262
                Ok(Self {
374262
                    node: BTreeNode::Interior(nodes),
374262
                    dirty: false,
374262
                })
            }
            1 => {
                // Leaf
1563885
                let mut nodes = Vec::new();
1563885
                if let Some(current_order) = current_order {
957083
                    nodes.reserve(current_order + 1);
957083
                }
10511482
                while !reader.is_empty() {
8947597
                    nodes.push(KeyEntry::deserialize_from(reader, current_order)?);
                }
1563885
                Ok(Self {
1563885
                    node: BTreeNode::Leaf(nodes),
1563885
                    dirty: false,
1563885
                })
            }
            _ => Err(Error::data_integrity("invalid node header")),
        }
1938147
    }
}

            
/// An operation to perform on a key.
#[derive(Debug)]
pub enum KeyOperation<T> {
    /// Do not alter the key.
    Skip,
    /// Set the key to the new value.
    Set(T),
    /// Remove the key.
    Remove,
}

            
struct DirectionalSliceIterator<'a, I> {
    forwards: bool,
    index: usize,
    contents: &'a [I],
}

            
impl<'a, I> DirectionalSliceIterator<'a, I> {
982396
    pub const fn new(forwards: bool, contents: &'a [I]) -> Self {
982396
        Self {
982396
            forwards,
982396
            contents,
982396
            index: if forwards { 0 } else { contents.len() },
        }
982396
    }
}

            
impl<'a, I> Iterator for DirectionalSliceIterator<'a, I> {
    type Item = &'a I;

            
5013760
    fn next(&mut self) -> Option<Self::Item> {
5013760
        if self.forwards && self.index < self.contents.len() {
4100151
            let element = &self.contents[self.index];
4100151
            self.index += 1;
4100151
            Some(element)
913609
        } else if !self.forwards && self.index > 0 {
24
            self.index -= 1;
24
            Some(&self.contents[self.index])
        } else {
913585
            None
        }
5013760
    }
}

            
/// Arguments for a tree scan operation.
pub struct ScanArgs<
    Value,
    Index,
    ReducedIndex,
    CallerError: Display + Debug,
    NodeEvaluator,
    KeyEvaluator,
    DataCallback,
> where
    NodeEvaluator: FnMut(&ArcBytes<'static>, &ReducedIndex, usize) -> ScanEvaluation,
    KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation,
    DataCallback: FnMut(ArcBytes<'static>, &Index, Value) -> Result<(), AbortError<CallerError>>,
{
    /// Controls the order of the scan. If true, the scan starts at the lowest
    /// ordered key and scans forward. If false, the scan starts at the highest
    /// ordered key and scans in reverse.
    pub forwards: bool,
    /// A callback that is invoked for each interior node being considered in
    /// the scan operation.
    pub node_evaluator: NodeEvaluator,
    /// A callback that is invoked for each matching key found during the scan
    /// operation.
    pub key_evaluator: KeyEvaluator,
    /// A callback that is invoked once data has been loaded for a key during
    /// the scan operation.
    pub data_callback: DataCallback,
    _phantom: PhantomData<(Value, Index, ReducedIndex, CallerError)>,
}

            
impl<
        Value,
        Index,
        ReducedIndex,
        CallerError: Display + Debug,
        NodeEvaluator,
        KeyEvaluator,
        DataCallback,
    > ScanArgs<Value, Index, ReducedIndex, CallerError, NodeEvaluator, KeyEvaluator, DataCallback>
where
    NodeEvaluator: FnMut(&ArcBytes<'static>, &ReducedIndex, usize) -> ScanEvaluation,
    KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation,
    DataCallback: FnMut(ArcBytes<'static>, &Index, Value) -> Result<(), AbortError<CallerError>>,
{
    /// Returns a new instance of the scan arguments.
32682
    pub fn new(
32682
        forwards: bool,
32682
        node_evaluator: NodeEvaluator,
32682
        key_evaluator: KeyEvaluator,
32682
        data_callback: DataCallback,
32682
    ) -> Self {
32682
        Self {
32682
            forwards,
32682
            node_evaluator,
32682
            key_evaluator,
32682
            data_callback,
32682
            _phantom: PhantomData,
32682
        }
32682
    }
}