1
use std::{
2
    collections::HashMap,
3
    fmt::{Debug, Display},
4
    ops::RangeBounds,
5
};
6

            
7
use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
8

            
9
use super::{
10
    btree::BTreeEntry,
11
    by_id::{ByIdStats, UnversionedByIdIndex},
12
    modify::Modification,
13
    serialization::BinarySerialization,
14
    PagedWriter, ScanEvaluation,
15
};
16
use crate::{
17
    chunk_cache::CacheEntry,
18
    error::{Error, InternalError},
19
    io::File,
20
    roots::AbortError,
21
    transaction::TransactionId,
22
    tree::{
23
        btree::{Indexer, KeyOperation, ModificationContext, NodeInclusion, ScanArgs},
24
        by_id::ByIdIndexer,
25
        dynamic_order, BTreeNode, ChangeResult, ModificationResult, PageHeader, Root,
26
    },
27
    vault::AnyVault,
28
    ArcBytes, ChunkCache, ErrorKind,
29
};
30

            
31
/// An unversioned tree with no additional indexed data.
32
pub type Unversioned = UnversionedTreeRoot<()>;
33

            
34
/// A versioned B-Tree root. This tree root internally uses two btrees, one to
35
/// keep track of all writes using a unique "sequence" ID, and one that keeps
36
/// track of all key-value pairs.
37
31169
#[derive(Clone, Debug)]
38
pub struct UnversionedTreeRoot<Index>
39
where
40
    Index: Clone + super::EmbeddedIndex<ArcBytes<'static>> + Debug + 'static,
41
{
42
    /// The transaction ID of the tree root. If this transaction ID isn't
43
    /// present in the transaction log, this root should not be trusted.
44
    pub transaction_id: Option<TransactionId>,
45
    /// The by-id B-Tree.
46
    pub by_id_root:
47
        BTreeEntry<UnversionedByIdIndex<Index, ArcBytes<'static>>, ByIdStats<Index::Reduced>>,
48

            
49
    reducer: <Self as Root>::Reducer,
50
}
51

            
52
impl<Index> Default for UnversionedTreeRoot<Index>
53
where
54
    Index: Clone + super::EmbeddedIndex<ArcBytes<'static>> + Debug + 'static,
55
    <Self as Root>::Reducer: Default,
56
{
57
4064
    fn default() -> Self {
58
4064
        Self {
59
4064
            transaction_id: None,
60
4064
            by_id_root: BTreeEntry::default(),
61
4064
            reducer: <<Self as Root>::Reducer as Default>::default(),
62
4064
        }
63
4064
    }
64
}
65

            
66
impl<Index> UnversionedTreeRoot<Index>
67
where
68
    Index: Clone + super::EmbeddedIndex<ArcBytes<'static>> + Debug + 'static,
69
{
70
    fn modify_id_root<'a, 'w>(
71
        &'a mut self,
72
        mut modification: Modification<
73
            '_,
74
            ArcBytes<'static>,
75
            UnversionedByIdIndex<Index, ArcBytes<'static>>,
76
        >,
77
        writer: &'a mut PagedWriter<'w>,
78
        max_order: Option<usize>,
79
    ) -> Result<Vec<ModificationResult<UnversionedByIdIndex<Index, ArcBytes<'static>>>>, Error>
80
    {
81
19613
        modification.prepare()?;
82

            
83
19613
        let total_keys =
84
19613
            self.by_id_root.stats(self.reducer()).total_keys() + modification.keys.len() as u64;
85
19613
        let by_id_order = dynamic_order(total_keys, max_order);
86
19613
        let minimum_children = by_id_order / 2 - 1;
87
19613
        let minimum_children =
88
19613
            minimum_children.min(usize::try_from(total_keys).unwrap_or(usize::MAX));
89
19613

            
90
19613
        let reducer = self.reducer.clone();
91
19613

            
92
19613
        let mut results = Vec::with_capacity(modification.keys.len());
93

            
94
39231
        while !modification.keys.is_empty() {
95
19618
            match self.by_id_root.modify(
96
19618
                &mut modification,
97
19618
                &mut ModificationContext::new(
98
19618
                    by_id_order,
99
19618
                    minimum_children,
100
19618
                    |key: &ArcBytes<'_>,
101
                     value: Option<&ArcBytes<'static>>,
102
                     _existing_index,
103
                     writer: &mut PagedWriter<'_>| {
104
231958
                        if let Some(value) = value {
105
150495
                            let position = writer.write_chunk_cached(value.clone())?;
106
                            // write_chunk errors if it can't fit within a u32
107
                            #[allow(clippy::cast_possible_truncation)]
108
150495
                            let value_length = value.len() as u32;
109
150495
                            let new_index = UnversionedByIdIndex::new(
110
150495
                                value_length,
111
150495
                                position,
112
150495
                                reducer.0.index(key, Some(value)),
113
150495
                            );
114
150495
                            results.push(ModificationResult {
115
150495
                                key: key.to_owned(),
116
150495
                                index: Some(new_index.clone()),
117
150495
                            });
118
150495
                            Ok(KeyOperation::Set(new_index))
119
                        } else {
120
81463
                            results.push(ModificationResult {
121
81463
                                key: key.to_owned(),
122
81463
                                index: None,
123
81463
                            });
124
81463
                            Ok(KeyOperation::Remove)
125
                        }
126
231958
                    },
127
80475
                    |index, writer| match writer.read_chunk(index.position)? {
128
80475
                        CacheEntry::ArcBytes(buffer) => Ok(Some(buffer.clone())),
129
                        CacheEntry::Decoded(_) => unreachable!(),
130
80475
                    },
131
19618
                    self.reducer().clone(),
132
19618
                ),
133
19618
                None,
134
19618
                writer,
135
19618
            )? {
136
19577
                ChangeResult::Absorb | ChangeResult::Changed | ChangeResult::Unchanged => {}
137
5
                ChangeResult::Remove => {
138
5
                    self.by_id_root.node = BTreeNode::Leaf(vec![]);
139
5
                    self.by_id_root.dirty = true;
140
5
                }
141
36
                ChangeResult::Split => {
142
36
                    self.by_id_root.split_root(&self.reducer().clone());
143
36
                }
144
            }
145
        }
146

            
147
19613
        Ok(results)
148
19613
    }
149
}
150

            
151
impl<EmbeddedIndex> Root for UnversionedTreeRoot<EmbeddedIndex>
152
where
153
    EmbeddedIndex: Clone + super::EmbeddedIndex<ArcBytes<'static>> + 'static,
154
{
155
    const HEADER: PageHeader = PageHeader::UnversionedHeader;
156
    type Value = ArcBytes<'static>;
157
    type Index = UnversionedByIdIndex<EmbeddedIndex, Self::Value>;
158
    type ReducedIndex = ByIdStats<EmbeddedIndex::Reduced>;
159
    type Reducer = ByIdIndexer<EmbeddedIndex::Indexer>;
160

            
161
4
    fn default_with(reducer: Self::Reducer) -> Self {
162
4
        Self {
163
4
            transaction_id: None,
164
4
            by_id_root: BTreeEntry::default(),
165
4
            reducer,
166
4
        }
167
4
    }
168

            
169
77663
    fn reducer(&self) -> &Self::Reducer {
170
77663
        &self.reducer
171
77663
    }
172

            
173
1710
    fn count(&self) -> u64 {
174
1710
        self.by_id_root.stats(self.reducer()).alive_keys
175
1710
    }
176

            
177
12292
    fn dirty(&self) -> bool {
178
12292
        self.by_id_root.dirty
179
12292
    }
180

            
181
26226
    fn initialized(&self) -> bool {
182
26226
        self.transaction_id.is_some()
183
26226
    }
184

            
185
26
    fn initialize_default(&mut self) {
186
26
        self.transaction_id = Some(TransactionId(0));
187
26
    }
188

            
189
    fn serialize(
190
        &mut self,
191
        paged_writer: &mut PagedWriter<'_>,
192
        output: &mut Vec<u8>,
193
    ) -> Result<(), Error> {
194
21323
        output.write_u64::<BigEndian>(
195
21323
            self.transaction_id
196
21323
                .expect("serializing an uninitialized root")
197
21323
                .0,
198
21323
        )?;
199
        // Reserve space for by_id size.
200
21323
        output.write_u32::<BigEndian>(0)?;
201

            
202
21323
        let by_id_size = self.by_id_root.serialize_to(output, paged_writer)?;
203
21323
        let by_id_size = u32::try_from(by_id_size)
204
21323
            .ok()
205
21323
            .ok_or(ErrorKind::Internal(InternalError::HeaderTooLarge))?;
206
21323
        BigEndian::write_u32(&mut output[8..12], by_id_size);
207
21323

            
208
21323
        Ok(())
209
21323
    }
210

            
211
4042
    fn deserialize(mut bytes: ArcBytes<'_>, reducer: Self::Reducer) -> Result<Self, Error> {
212
4042
        let transaction_id = Some(TransactionId(bytes.read_u64::<BigEndian>()?));
213
4042
        let by_id_size = bytes.read_u32::<BigEndian>()? as usize;
214
4042
        if by_id_size != bytes.len() {
215
            return Err(Error::data_integrity(format!(
216
                "Header reported index size {}, but data has {} remaining",
217
                by_id_size,
218
                bytes.len()
219
            )));
220
4042
        };
221

            
222
4042
        let mut by_id_bytes = bytes.read_bytes(by_id_size)?.to_owned();
223

            
224
4042
        let by_id_root = BTreeEntry::deserialize_from(&mut by_id_bytes, None)?;
225

            
226
4042
        Ok(Self {
227
4042
            transaction_id,
228
4042
            by_id_root,
229
4042
            reducer,
230
4042
        })
231
4042
    }
232

            
233
    fn transaction_id(&self) -> TransactionId {
234
        self.transaction_id.unwrap_or_default()
235
    }
236

            
237
19613
    fn modify(
238
19613
        &mut self,
239
19613
        modification: Modification<'_, ArcBytes<'static>, Self::Index>,
240
19613
        writer: &mut PagedWriter<'_>,
241
19613
        max_order: Option<usize>,
242
19613
    ) -> Result<Vec<ModificationResult<Self::Index>>, Error> {
243
19613
        let transaction_id = modification.persistence_mode.transaction_id();
244

            
245
19613
        let results = self.modify_id_root(modification, writer, max_order)?;
246

            
247
        // Only update the transaction id if a new one was specified.
248
19613
        if let Some(transaction_id) = transaction_id {
249
12292
            self.transaction_id = Some(transaction_id);
250
12292
        }
251

            
252
19613
        Ok(results)
253
19613
    }
254

            
255
28163
    fn get_multiple<'keys, KeyEvaluator, KeyReader, Keys>(
256
28163
        &self,
257
28163
        keys: &mut Keys,
258
28163
        key_evaluator: &mut KeyEvaluator,
259
28163
        key_reader: &mut KeyReader,
260
28163
        file: &mut dyn File,
261
28163
        vault: Option<&dyn AnyVault>,
262
28163
        cache: Option<&ChunkCache>,
263
28163
    ) -> Result<(), Error>
264
28163
    where
265
28163
        KeyEvaluator: FnMut(&ArcBytes<'static>, &Self::Index) -> ScanEvaluation,
266
28163
        KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Self::Index) -> Result<(), Error>,
267
28163
        Keys: Iterator<Item = &'keys [u8]>,
268
28163
    {
269
28163
        self.by_id_root
270
28163
            .get_multiple(keys, key_evaluator, key_reader, file, vault, cache)
271
28163
    }
272

            
273
32660
    fn scan<
274
32660
        'keys,
275
32660
        CallerError: Display + Debug,
276
32660
        NodeEvaluator,
277
32660
        KeyRangeBounds,
278
32660
        KeyEvaluator,
279
32660
        DataCallback,
280
32660
    >(
281
32660
        &self,
282
32660
        range: &'keys KeyRangeBounds,
283
32660
        mut args: ScanArgs<
284
32660
            Self::Value,
285
32660
            Self::Index,
286
32660
            Self::ReducedIndex,
287
32660
            CallerError,
288
32660
            NodeEvaluator,
289
32660
            KeyEvaluator,
290
32660
            DataCallback,
291
32660
        >,
292
32660
        file: &mut dyn File,
293
32660
        vault: Option<&dyn AnyVault>,
294
32660
        cache: Option<&ChunkCache>,
295
32660
    ) -> Result<bool, AbortError<CallerError>>
296
32660
    where
297
32660
        NodeEvaluator: FnMut(&ArcBytes<'static>, &Self::ReducedIndex, usize) -> ScanEvaluation,
298
32660
        KeyEvaluator: FnMut(&ArcBytes<'static>, &Self::Index) -> ScanEvaluation,
299
32660
        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
300
32660
        DataCallback: FnMut(
301
32660
            ArcBytes<'static>,
302
32660
            &Self::Index,
303
32660
            ArcBytes<'static>,
304
32660
        ) -> Result<(), AbortError<CallerError>>,
305
32660
    {
306
32660
        self.by_id_root
307
32660
            .scan(range, &mut args, file, vault, cache, 0)
308
32660
    }
309

            
310
3420
    fn copy_data_to(
311
3420
        &mut self,
312
3420
        include_nodes: bool,
313
3420
        file: &mut dyn File,
314
3420
        copied_chunks: &mut HashMap<u64, u64>,
315
3420
        writer: &mut PagedWriter<'_>,
316
3420
        vault: Option<&dyn AnyVault>,
317
3420
    ) -> Result<(), Error> {
318
3420
        let mut scratch = Vec::new();
319
3420
        self.by_id_root.copy_data_to(
320
3420
            if include_nodes {
321
1710
                NodeInclusion::IncludeNext
322
            } else {
323
1710
                NodeInclusion::Exclude
324
            },
325
3420
            file,
326
3420
            copied_chunks,
327
3420
            writer,
328
3420
            vault,
329
3420
            &mut scratch,
330
3420
            &mut |_key,
331
                  index: &mut UnversionedByIdIndex<EmbeddedIndex, ArcBytes<'static>>,
332
                  from_file,
333
                  copied_chunks,
334
                  to_file,
335
1700262
                  vault| {
336
1700262
                let new_position =
337
1700262
                    to_file.copy_chunk_from(index.position, from_file, copied_chunks, vault)?;
338

            
339
1700262
                if new_position == index.position {
340
                    // Data is already in the new file
341
558104
                    Ok(false)
342
                } else {
343
1142158
                    index.position = new_position;
344
1142158
                    Ok(true)
345
                }
346
1700262
            },
347
        )?;
348

            
349
3420
        Ok(())
350
3420
    }
351
}