1
use std::{
2
    any::Any,
3
    borrow::Cow,
4
    collections::HashMap,
5
    fmt::{Debug, Display},
6
    marker::PhantomData,
7
    ops::RangeBounds,
8
    sync::Arc,
9
};
10

            
11
use crate::{
12
    error::Error,
13
    io::{File, ManagedFile, PathId},
14
    roots::AnyTransactionTree,
15
    transaction::{TransactionId, TransactionManager},
16
    tree::{
17
        btree::ScanArgs, state::AnyTreeState, Modification, ModificationResult, PageHeader,
18
        PagedWriter, Reducer, ScanEvaluation, State, TreeFile,
19
    },
20
    vault::AnyVault,
21
    AbortError, ArcBytes, ChunkCache, Context, TransactionTree, Vault,
22
};
23

            
24
/// A B-Tree root implementation.
25
pub trait Root: Debug + Send + Sync + Clone + 'static {
26
    /// The unique header byte for this root.
27
    const HEADER: PageHeader;
28

            
29
    /// The primary index type contained within this root.
30
    type Index: Clone + Debug + 'static;
31
    /// The primary index type contained within this root.
32
    type ReducedIndex: Clone + Debug + 'static;
33
    /// The reducer that reduces `Index`es and re-reduces `ReducedIndex`es.
34
    type Reducer: Reducer<Self::Index, Self::ReducedIndex> + 'static;
35
    /// The value type stored by this root.
36
    type Value: Debug + 'static;
37

            
38
    /// Returns a new instance with the provided reducer.
39
    fn default_with(reducer: Self::Reducer) -> Self;
40

            
41
    /// Returns the instance's reducer.
42
    fn reducer(&self) -> &Self::Reducer;
43

            
44
    /// Returns the number of values contained in this tree, not including
45
    /// deleted records.
46
    fn count(&self) -> u64;
47

            
48
    /// Returns a reference to a named tree that contains this type of root.
49
24
    fn tree<File: ManagedFile>(name: impl Into<Cow<'static, str>>) -> TreeRoot<Self, File>
50
24
    where
51
24
        Self::Reducer: Default,
52
24
    {
53
24
        TreeRoot {
54
24
            name: name.into(),
55
24
            vault: None,
56
24
            reducer: Arc::new(<Self::Reducer as Default>::default()),
57
24
            _phantom: PhantomData,
58
24
        }
59
24
    }
60

            
61
    /// Returns a reference to a named tree that contains this type of root.
62
24590
    fn tree_with_reducer<File: ManagedFile>(
63
24590
        name: impl Into<Cow<'static, str>>,
64
24590
        reducer: Self::Reducer,
65
24590
    ) -> TreeRoot<Self, File> {
66
24590
        TreeRoot {
67
24590
            name: name.into(),
68
24590
            vault: None,
69
24590
            reducer: Arc::new(reducer),
70
24590
            _phantom: PhantomData,
71
24590
        }
72
24590
    }
73

            
74
    /// Returns true if the root needs to be saved.
75
    fn dirty(&self) -> bool;
76

            
77
    /// Returns true if the tree is initialized.
78
    fn initialized(&self) -> bool;
79

            
80
    /// Resets the state to the default, initialized state. After calling this,
81
    /// `Self::initialized()` will return true.
82
    fn initialize_default(&mut self);
83

            
84
    /// Serialize the root and return the bytes. Writes any additional data to
85
    /// `paged_writer` in the process of serialization.
86
    fn serialize(
87
        &mut self,
88
        paged_writer: &mut PagedWriter<'_>,
89
        output: &mut Vec<u8>,
90
    ) -> Result<(), Error>;
91

            
92
    /// Deserializes the root from `bytes`.
93
    fn deserialize(bytes: ArcBytes<'_>, reducer: Self::Reducer) -> Result<Self, Error>;
94

            
95
    /// Returns the current transaction id.
96
    fn transaction_id(&self) -> TransactionId;
97

            
98
    /// Modifies the tree. Returns a list of modified keys and their updated
99
    /// indexes, if the keys are still present.
100
    fn modify<'a, 'w>(
101
        &'a mut self,
102
        modification: Modification<'_, Self::Value, Self::Index>,
103
        writer: &'a mut PagedWriter<'w>,
104
        max_order: Option<usize>,
105
    ) -> Result<Vec<ModificationResult<Self::Index>>, Error>;
106

            
107
    /// Iterates over the tree looking for `keys`. `keys` must be sorted.
108
    /// `key_evaluator` is invoked for each key as it is found, allowing for
109
    /// decisions on how to handle each key. `key_reader` will be invoked for
110
    /// each key that is requested to be read, but it might be invoked at a
111
    /// later time and in a different order.
112
    fn get_multiple<'keys, KeyEvaluator, KeyReader, Keys>(
113
        &self,
114
        keys: &mut Keys,
115
        key_evaluator: &mut KeyEvaluator,
116
        key_reader: &mut KeyReader,
117
        file: &mut dyn File,
118
        vault: Option<&dyn AnyVault>,
119
        cache: Option<&ChunkCache>,
120
    ) -> Result<(), Error>
121
    where
122
        KeyEvaluator: FnMut(&ArcBytes<'static>, &Self::Index) -> ScanEvaluation,
123
        KeyReader: FnMut(ArcBytes<'static>, Self::Value, Self::Index) -> Result<(), Error>,
124
        Keys: Iterator<Item = &'keys [u8]>;
125

            
126
    /// Scans the tree over `range`. `args.key_evaluator` is invoked for each key as
127
    /// it is found, allowing for decisions on how to handle each key.
128
    /// `args.key_reader` will be invoked for each key that is requested to be read,
129
    /// but it might be invoked at a later time and in a different order.
130
    fn scan<
131
        'keys,
132
        CallerError: Display + Debug,
133
        NodeEvaluator,
134
        KeyRangeBounds,
135
        KeyEvaluator,
136
        ScanDataCallback,
137
    >(
138
        &self,
139
        range: &'keys KeyRangeBounds,
140
        args: ScanArgs<
141
            Self::Value,
142
            Self::Index,
143
            Self::ReducedIndex,
144
            CallerError,
145
            NodeEvaluator,
146
            KeyEvaluator,
147
            ScanDataCallback,
148
        >,
149
        file: &mut dyn File,
150
        vault: Option<&dyn AnyVault>,
151
        cache: Option<&ChunkCache>,
152
    ) -> Result<bool, AbortError<CallerError>>
153
    where
154
        NodeEvaluator: FnMut(&ArcBytes<'static>, &Self::ReducedIndex, usize) -> ScanEvaluation,
155
        KeyEvaluator: FnMut(&ArcBytes<'static>, &Self::Index) -> ScanEvaluation,
156
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
157
        ScanDataCallback: FnMut(
158
            ArcBytes<'static>,
159
            &Self::Index,
160
            Self::Value,
161
        ) -> Result<(), AbortError<CallerError>>;
162

            
163
    /// Copies all data from `file` into `writer`, updating `self` with the new
164
    /// file positions.
165
    fn copy_data_to(
166
        &mut self,
167
        include_nodes: bool,
168
        file: &mut dyn File,
169
        copied_chunks: &mut HashMap<u64, u64>,
170
        writer: &mut PagedWriter<'_>,
171
        vault: Option<&dyn AnyVault>,
172
    ) -> Result<(), Error>;
173
}
174

            
175
/// A named tree with a specific root type.
176
#[must_use]
177
pub struct TreeRoot<R: Root, File: ManagedFile> {
178
    /// The name of the tree.
179
    pub name: Cow<'static, str>,
180
    /// The vault to use when opening the tree. If not set, the `Context` will
181
    /// provide the vault.
182
    pub vault: Option<Arc<dyn AnyVault>>,
183
    /// The [`Reducer`] for this tree.
184
    pub(crate) reducer: Arc<dyn AnyReducer>,
185
    _phantom: PhantomData<(R, File)>,
186
}
187

            
188
pub trait AnyReducer: Send + Sync + 'static {
189
    fn as_any(&self) -> &dyn Any;
190
}
191

            
192
impl<T> AnyReducer for T
193
where
194
    T: Any + Send + Sync + 'static,
195
{
196
24609
    fn as_any(&self) -> &dyn Any {
197
24609
        self
198
24609
    }
199
}
200

            
201
impl<R: Root, File: ManagedFile> TreeRoot<R, File> {
202
    /// Replaces the vault currenty set with `vault`.
203
3
    pub fn with_vault<V: Vault>(mut self, vault: V) -> Self {
204
3
        self.vault = Some(Arc::new(vault));
205
3
        self
206
3
    }
207
}
208

            
209
impl<R: Root, File: ManagedFile> Clone for TreeRoot<R, File> {
210
19
    fn clone(&self) -> Self {
211
19
        Self {
212
19
            name: self.name.clone(),
213
19
            vault: self.vault.clone(),
214
19
            reducer: self.reducer.clone(),
215
19
            _phantom: PhantomData,
216
19
        }
217
19
    }
218
}
219

            
220
/// A named tree that can be used in a transaction.
221
pub trait AnyTreeRoot<File: ManagedFile> {
222
    /// The name of the tree.
223
    fn name(&self) -> &str;
224
    /// The default state for the underlying root type.
225
    fn default_state(&self) -> Box<dyn AnyTreeState>;
226
    /// Begins a transaction on this tree.
227
    fn begin_transaction(
228
        &self,
229
        transaction_id: TransactionId,
230
        file_path: &PathId,
231
        state: &dyn AnyTreeState,
232
        context: &Context<File::Manager>,
233
        transactions: Option<&TransactionManager<File::Manager>>,
234
    ) -> Result<Box<dyn AnyTransactionTree<File>>, Error>;
235
}
236

            
237
impl<R: Root, File: ManagedFile> AnyTreeRoot<File> for TreeRoot<R, File> {
238
122994
    fn name(&self) -> &str {
239
122994
        &self.name
240
122994
    }
241

            
242
19
    fn default_state(&self) -> Box<dyn AnyTreeState> {
243
19
        Box::new(State::<R>::new(
244
19
            None,
245
19
            None,
246
19
            R::default_with(
247
19
                self.reducer
248
19
                    .as_ref()
249
19
                    .as_any()
250
19
                    .downcast_ref::<R::Reducer>()
251
19
                    .unwrap()
252
19
                    .clone(),
253
19
            ),
254
19
        ))
255
19
    }
256

            
257
24595
    fn begin_transaction(
258
24595
        &self,
259
24595
        transaction_id: TransactionId,
260
24595
        file_path: &PathId,
261
24595
        state: &dyn AnyTreeState,
262
24595
        context: &Context<File::Manager>,
263
24595
        transactions: Option<&TransactionManager<File::Manager>>,
264
24595
    ) -> Result<Box<dyn AnyTransactionTree<File>>, Error> {
265
24595
        let context = self.vault.as_ref().map_or_else(
266
24595
            || Cow::Borrowed(context),
267
24595
            |vault| Cow::Owned(context.clone().with_any_vault(vault.clone())),
268
24595
        );
269
24595
        let tree = TreeFile::write(
270
24595
            file_path,
271
24595
            state.as_any().downcast_ref::<State<R>>().unwrap().clone(),
272
24595
            &context,
273
24595
            transactions,
274
24595
        )?;
275

            
276
24595
        Ok(Box::new(TransactionTree {
277
24595
            transaction_id,
278
24595
            tree,
279
24595
        }))
280
24595
    }
281
}