1
use std::marker::PhantomData;
2

            
3
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
4

            
5
use super::{btree::Reducer, BinarySerialization, PagedWriter};
6
use crate::{
7
    error::Error,
8
    tree::{by_sequence::SequenceId, key_entry::PositionIndex},
9
    ArcBytes,
10
};
11

            
12
/// The index stored within [`VersionedTreeRoot::by_id_root`](crate::tree::VersionedTreeRoot::by_id_root).
13
336730
#[derive(Clone, Debug)]
14
pub struct VersionedByIdIndex<EmbeddedIndex: super::EmbeddedIndex<Value>, Value> {
15
    /// The unique sequence id generated when writing the value to the file.
16
    pub sequence_id: SequenceId,
17
    /// The size of the value stored on disk.
18
    pub value_length: u32,
19
    /// The position of the value on disk.
20
    pub position: u64,
21
    /// The embedded index.
22
    pub embedded: EmbeddedIndex,
23

            
24
    _value: PhantomData<Value>,
25
}
26

            
27
impl<EmbeddedIndex, Value> VersionedByIdIndex<EmbeddedIndex, Value>
28
where
29
    EmbeddedIndex: super::EmbeddedIndex<Value>,
30
{
31
    /// Retruns a new index instance.
32
3446518
    pub fn new(
33
3446518
        sequence_id: SequenceId,
34
3446518
        value_length: u32,
35
3446518
        position: u64,
36
3446518
        embedded: EmbeddedIndex,
37
3446518
    ) -> Self {
38
3446518
        Self {
39
3446518
            sequence_id,
40
3446518
            value_length,
41
3446518
            position,
42
3446518
            embedded,
43
3446518
            _value: PhantomData,
44
3446518
        }
45
3446518
    }
46
}
47

            
48
impl<EmbeddedIndex, Value> BinarySerialization for VersionedByIdIndex<EmbeddedIndex, Value>
49
where
50
    EmbeddedIndex: super::EmbeddedIndex<Value>,
51
    Value: Send + Sync,
52
{
53
    fn serialize_to(
54
        &mut self,
55
        writer: &mut Vec<u8>,
56
        _paged_writer: &mut PagedWriter<'_>,
57
    ) -> Result<usize, Error> {
58
2323639
        writer.write_u64::<BigEndian>(self.sequence_id.0)?;
59
2323639
        writer.write_u32::<BigEndian>(self.value_length)?;
60
2323639
        writer.write_u64::<BigEndian>(self.position)?;
61
2323639
        Ok(20 + self.embedded.serialize_to(writer)?)
62
2323639
    }
63

            
64
3213720
    fn deserialize_from(
65
3213720
        reader: &mut ArcBytes<'_>,
66
3213720
        _current_order: Option<usize>,
67
3213720
    ) -> Result<Self, Error> {
68
3213720
        let sequence_id = SequenceId(reader.read_u64::<BigEndian>()?);
69
3213720
        let value_length = reader.read_u32::<BigEndian>()?;
70
3213720
        let position = reader.read_u64::<BigEndian>()?;
71
        Ok(Self::new(
72
3213720
            sequence_id,
73
3213720
            value_length,
74
3213720
            position,
75
3213720
            EmbeddedIndex::deserialize_from(reader)?,
76
        ))
77
3213720
    }
78
}
79

            
80
impl<EmbeddedIndex, Value> PositionIndex for VersionedByIdIndex<EmbeddedIndex, Value>
81
where
82
    EmbeddedIndex: super::EmbeddedIndex<Value>,
83
{
84
87276
    fn position(&self) -> u64 {
85
87276
        self.position
86
87276
    }
87
}
88

            
89
/// The index stored within [`UnversionedTreeRoot::by_id_root`](crate::tree::UnversionedTreeRoot::by_id_root).
90
3058769
#[derive(Clone, Debug)]
91
pub struct UnversionedByIdIndex<EmbeddedIndex: super::EmbeddedIndex<Value>, Value> {
92
    /// The size of the value stored on disk.
93
    pub value_length: u32,
94
    /// The position of the value on disk.
95
    pub position: u64,
96
    /// The embedded index.
97
    pub embedded: EmbeddedIndex,
98

            
99
    _value: PhantomData<Value>,
100
}
101

            
102
impl<EmbeddedIndex, Value> UnversionedByIdIndex<EmbeddedIndex, Value>
103
where
104
    EmbeddedIndex: super::EmbeddedIndex<Value>,
105
{
106
    /// Retruns a new index instance.
107
5808568
    pub fn new(value_length: u32, position: u64, embedded: EmbeddedIndex) -> Self {
108
5808568
        Self {
109
5808568
            value_length,
110
5808568
            position,
111
5808568
            embedded,
112
5808568
            _value: PhantomData,
113
5808568
        }
114
5808568
    }
115
}
116

            
117
impl<EmbeddedIndex, Value> BinarySerialization for UnversionedByIdIndex<EmbeddedIndex, Value>
118
where
119
    EmbeddedIndex: super::EmbeddedIndex<Value>,
120
    Value: Send + Sync,
121
{
122
    fn serialize_to(
123
        &mut self,
124
        writer: &mut Vec<u8>,
125
        _paged_writer: &mut PagedWriter<'_>,
126
    ) -> Result<usize, Error> {
127
1886446
        writer.write_u32::<BigEndian>(self.value_length)?;
128
1886446
        writer.write_u64::<BigEndian>(self.position)?;
129
1886446
        Ok(12 + self.embedded.serialize_to(writer)?)
130
1886446
    }
131

            
132
5658073
    fn deserialize_from(
133
5658073
        reader: &mut ArcBytes<'_>,
134
5658073
        _current_order: Option<usize>,
135
5658073
    ) -> Result<Self, Error> {
136
5658073
        let value_length = reader.read_u32::<BigEndian>()?;
137
5658073
        let position = reader.read_u64::<BigEndian>()?;
138
        Ok(Self::new(
139
5658073
            value_length,
140
5658073
            position,
141
5658073
            EmbeddedIndex::deserialize_from(reader)?,
142
        ))
143
5658073
    }
144
}
145

            
146
impl<EmbeddedIndex, Value> PositionIndex for UnversionedByIdIndex<EmbeddedIndex, Value>
147
where
148
    EmbeddedIndex: super::EmbeddedIndex<Value>,
149
{
150
78493
    fn position(&self) -> u64 {
151
78493
        self.position
152
78493
    }
153
}
154

            
155
/// The reduced index of both [`VersionedByIdIndex`] and [`UnversionedByIdIndex`]
156
1905068
#[derive(Clone, Debug)]
157
pub struct ByIdStats<EmbeddedStats> {
158
    /// The number of keys that have values stored within them.
159
    pub alive_keys: u64,
160
    /// The number of keys that no longer have values stored within them.
161
    pub deleted_keys: u64,
162
    /// The total number of bytes stored on disk associated with currently-alive values.
163
    pub total_indexed_bytes: u64,
164
    /// The embedded statistics.
165
    pub embedded: EmbeddedStats,
166
}
167

            
168
impl<EmbeddedStats> ByIdStats<EmbeddedStats> {
169
    /// Returns the total number of keys regardless of whether data is stored within them.
170
    #[must_use]
171
40066
    pub const fn total_keys(&self) -> u64 {
172
40066
        self.alive_keys + self.deleted_keys
173
40066
    }
174
}
175

            
176
impl<EmbeddedStats> BinarySerialization for ByIdStats<EmbeddedStats>
177
where
178
    EmbeddedStats: super::Serializable,
179
{
180
    fn serialize_to(
181
        &mut self,
182
        writer: &mut Vec<u8>,
183
        _paged_writer: &mut PagedWriter<'_>,
184
    ) -> Result<usize, Error> {
185
4165916
        writer.write_u64::<BigEndian>(self.alive_keys)?;
186
4165916
        writer.write_u64::<BigEndian>(self.deleted_keys)?;
187
4165916
        writer.write_u64::<BigEndian>(self.total_indexed_bytes)?;
188
4165916
        Ok(24 + self.embedded.serialize_to(writer)?)
189
4165916
    }
190

            
191
5546130
    fn deserialize_from(
192
5546130
        reader: &mut ArcBytes<'_>,
193
5546130
        _current_order: Option<usize>,
194
5546130
    ) -> Result<Self, Error> {
195
5546130
        let alive_keys = reader.read_u64::<BigEndian>()?;
196
5546130
        let deleted_keys = reader.read_u64::<BigEndian>()?;
197
5546130
        let total_indexed_bytes = reader.read_u64::<BigEndian>()?;
198
        Ok(Self {
199
5546130
            alive_keys,
200
5546130
            deleted_keys,
201
5546130
            total_indexed_bytes,
202
5546130
            embedded: EmbeddedStats::deserialize_from(reader)?,
203
        })
204
5546130
    }
205
}
206

            
207
/// Indexes and Reduces [`VersionedByIdIndex`] and [`UnversionedByIdIndex`].
208
/// Contains an [`EmbeddedIndex`][super::EmbeddedIndex].
209
214952
#[derive(Clone, Default, Debug)]
210
pub struct ByIdIndexer<EmbeddedIndexer>(pub EmbeddedIndexer);
211

            
212
impl<EmbeddedIndexer, EmbeddedIndex, EmbeddedStats, Value>
213
    Reducer<VersionedByIdIndex<EmbeddedIndex, Value>, ByIdStats<EmbeddedStats>>
214
    for ByIdIndexer<EmbeddedIndexer>
215
where
216
    EmbeddedIndexer: Reducer<EmbeddedIndex, EmbeddedStats>,
217
    EmbeddedIndex: super::EmbeddedIndex<Value, Indexer = EmbeddedIndexer, Reduced = EmbeddedStats>,
218
    Value: Send + Sync + 'static,
219
{
220
134186
    fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> ByIdStats<EmbeddedStats>
221
134186
    where
222
134186
        EmbeddedIndex: 'a,
223
134186
        Indexes: IntoIterator<
224
134186
                Item = &'a VersionedByIdIndex<EmbeddedIndex, Value>,
225
134186
                IntoIter = IndexesIter,
226
134186
            > + ExactSizeIterator,
227
134186
        IndexesIter: Iterator<Item = &'a VersionedByIdIndex<EmbeddedIndex, Value>>
228
134186
            + ExactSizeIterator
229
134186
            + Clone,
230
134186
    {
231
134186
        self.reduce(indexes)
232
134186
    }
233

            
234
88007
    fn rereduce<
235
88007
        'a,
236
88007
        ReducedIndexes: IntoIterator<Item = &'a ByIdStats<EmbeddedStats>, IntoIter = ReducedIndexesIter>
237
88007
            + ExactSizeIterator,
238
88007
        ReducedIndexesIter: Iterator<Item = &'a ByIdStats<EmbeddedStats>> + ExactSizeIterator + Clone,
239
88007
    >(
240
88007
        &self,
241
88007
        values: ReducedIndexes,
242
88007
    ) -> ByIdStats<EmbeddedStats>
243
88007
    where
244
88007
        Self: 'a,
245
88007
        EmbeddedStats: 'a,
246
88007
    {
247
88007
        self.rereduce(values)
248
88007
    }
249
}
250

            
251
impl<EmbeddedIndexer, EmbeddedIndex, EmbeddedStats, Value>
252
    Reducer<UnversionedByIdIndex<EmbeddedIndex, Value>, ByIdStats<EmbeddedStats>>
253
    for ByIdIndexer<EmbeddedIndexer>
254
where
255
    EmbeddedIndexer: Reducer<EmbeddedIndex, EmbeddedStats>,
256
    EmbeddedIndex: super::EmbeddedIndex<Value, Indexer = EmbeddedIndexer, Reduced = EmbeddedStats>,
257
    Value: 'static,
258
{
259
148637
    fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> ByIdStats<EmbeddedStats>
260
148637
    where
261
148637
        EmbeddedIndex: 'a,
262
148637
        Indexes: IntoIterator<
263
148637
                Item = &'a UnversionedByIdIndex<EmbeddedIndex, Value>,
264
148637
                IntoIter = IndexesIter,
265
148637
            > + ExactSizeIterator,
266
148637
        IndexesIter: Iterator<Item = &'a UnversionedByIdIndex<EmbeddedIndex, Value>>
267
148637
            + ExactSizeIterator
268
148637
            + Clone,
269
148637
    {
270
148637
        self.reduce(indexes)
271
148637
    }
272

            
273
116822
    fn rereduce<'a, ReducedIndexes, ReducedIndexesIter>(
274
116822
        &self,
275
116822
        values: ReducedIndexes,
276
116822
    ) -> ByIdStats<EmbeddedStats>
277
116822
    where
278
116822
        Self: 'a,
279
116822
        EmbeddedStats: 'a,
280
116822
        ReducedIndexes: IntoIterator<Item = &'a ByIdStats<EmbeddedStats>, IntoIter = ReducedIndexesIter>
281
116822
            + ExactSizeIterator,
282
116822
        ReducedIndexesIter:
283
116822
            Iterator<Item = &'a ByIdStats<EmbeddedStats>> + ExactSizeIterator + Clone,
284
116822
    {
285
116822
        self.rereduce(values)
286
116822
    }
287
}
288

            
289
impl<EmbeddedIndexer> ByIdIndexer<EmbeddedIndexer> {
290
282823
    fn reduce<'a, EmbeddedIndex, EmbeddedStats, Id, Indexes, IndexesIter, Value>(
291
282823
        &self,
292
282823
        values: Indexes,
293
282823
    ) -> ByIdStats<EmbeddedStats>
294
282823
    where
295
282823
        Id: IdIndex<EmbeddedIndex> + 'a,
296
282823
        EmbeddedIndex:
297
282823
            super::EmbeddedIndex<Value, Indexer = EmbeddedIndexer, Reduced = EmbeddedStats> + 'a,
298
282823
        Indexes: IntoIterator<Item = &'a Id, IntoIter = IndexesIter> + ExactSizeIterator,
299
282823
        IndexesIter: Iterator<Item = &'a Id> + ExactSizeIterator + Clone,
300
282823
        EmbeddedIndexer: Reducer<EmbeddedIndex, EmbeddedStats>,
301
282823
    {
302
282823
        let values = values.into_iter();
303
282823
        let (alive_keys, deleted_keys, total_indexed_bytes) = values
304
282823
            .clone()
305
7209491
            .map(|index| {
306
7209491
                if index.position() > 0 {
307
                    // Alive key
308
6325811
                    (1, 0, u64::from(index.value_size()))
309
                } else {
310
                    // Deleted
311
883680
                    (0, 1, 0)
312
                }
313
7209491
            })
314
282823
            .reduce(
315
6926737
                |(total_alive, total_deleted, total_size), (alive, deleted, size)| {
316
6926737
                    (
317
6926737
                        total_alive + alive,
318
6926737
                        total_deleted + deleted,
319
6926737
                        total_size + size,
320
6926737
                    )
321
6926737
                },
322
282823
            )
323
282823
            .unwrap_or_default();
324
282823
        ByIdStats {
325
282823
            alive_keys,
326
282823
            deleted_keys,
327
282823
            total_indexed_bytes,
328
282823
            embedded: self.0.reduce(values.map(IdIndex::embedded)),
329
282823
        }
330
282823
    }
331

            
332
204829
    fn rereduce<'a, ReducedIndexes, ReducedIndexesIter, EmbeddedStats, EmbeddedIndex>(
333
204829
        &self,
334
204829
        values: ReducedIndexes,
335
204829
    ) -> ByIdStats<EmbeddedStats>
336
204829
    where
337
204829
        EmbeddedStats: 'a,
338
204829
        ReducedIndexes: IntoIterator<Item = &'a ByIdStats<EmbeddedStats>, IntoIter = ReducedIndexesIter>
339
204829
            + ExactSizeIterator,
340
204829
        ReducedIndexesIter:
341
204829
            Iterator<Item = &'a ByIdStats<EmbeddedStats>> + ExactSizeIterator + Clone,
342
204829
        EmbeddedIndexer: Reducer<EmbeddedIndex, EmbeddedStats>,
343
204829
    {
344
204829
        let values = values.into_iter();
345
204829
        ByIdStats {
346
4190788
            alive_keys: values.clone().map(|v| v.alive_keys).sum(),
347
4190788
            deleted_keys: values.clone().map(|v| v.deleted_keys).sum(),
348
4190788
            total_indexed_bytes: values.clone().map(|v| v.total_indexed_bytes).sum(),
349
204829
            // TODO change this to an iterator
350
204829
            embedded: self.0.rereduce(values.map(|v| &v.embedded)),
351
204829
        }
352
204829
    }
353
}
354

            
355
pub trait IdIndex<EmbeddedIndex> {
356
    fn value_size(&self) -> u32;
357
    fn position(&self) -> u64;
358
    fn embedded(&self) -> &EmbeddedIndex;
359
}
360

            
361
impl<EmbeddedIndex, Value> IdIndex<EmbeddedIndex> for UnversionedByIdIndex<EmbeddedIndex, Value>
362
where
363
    EmbeddedIndex: super::EmbeddedIndex<Value>,
364
{
365
4620188
    fn value_size(&self) -> u32 {
366
4620188
        self.value_length
367
4620188
    }
368

            
369
4620188
    fn position(&self) -> u64 {
370
4620188
        self.position
371
4620188
    }
372

            
373
    fn embedded(&self) -> &EmbeddedIndex {
374
        &self.embedded
375
    }
376
}
377

            
378
impl<EmbeddedIndex, Value> IdIndex<EmbeddedIndex> for VersionedByIdIndex<EmbeddedIndex, Value>
379
where
380
    EmbeddedIndex: super::EmbeddedIndex<Value>,
381
{
382
1705623
    fn value_size(&self) -> u32 {
383
1705623
        self.value_length
384
1705623
    }
385

            
386
2589303
    fn position(&self) -> u64 {
387
2589303
        self.position
388
2589303
    }
389

            
390
    fn embedded(&self) -> &EmbeddedIndex {
391
        &self.embedded
392
    }
393
}