1
use std::fmt::Display;
2

            
3
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
4

            
5
use crate::{
6
    error::Error,
7
    tree::{btree::Reducer, key_entry::PositionIndex, BinarySerialization, PagedWriter},
8
    ArcBytes, ErrorKind,
9
};
10

            
11
/// A unique ID of a single modification to a key in a versioned tree file.
12
458553
#[derive(Default, Debug, Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
13
pub struct SequenceId(pub u64);
14

            
15
impl From<u64> for SequenceId {
16
    fn from(id: u64) -> Self {
17
        Self(id)
18
    }
19
}
20

            
21
impl From<SequenceId> for u64 {
22
    fn from(id: SequenceId) -> Self {
23
        id.0
24
    }
25
}
26

            
27
impl SequenceId {
28
    #[must_use]
29
106164
    pub(crate) const fn valid(self) -> bool {
30
106164
        self.0 > 0
31
106164
    }
32

            
33
    /// Returns the nexxt sequence id after `self`.
34
232798
    pub fn next_sequence(&self) -> Option<Self> {
35
232798
        self.0.checked_add(1).map(Self)
36
232798
    }
37
}
38

            
39
impl Display for SequenceId {
40
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41
        self.0.fmt(f)
42
    }
43
}
44

            
45
/// The index stored within [`VersionedTreeRoot::by_sequence_root`](crate::tree::VersionedTreeRoot::by_sequence_root).
46
380374
#[derive(Clone, Debug)]
47
pub struct BySequenceIndex<Embedded> {
48
    /// The key associated with this sequence id.
49
    pub key: ArcBytes<'static>,
50
    /// The previous sequence of this key.
51
    pub last_sequence: Option<SequenceId>,
52
    /// The size of the value stored on disk.
53
    pub value_length: u32,
54
    /// The position of the value on disk.
55
    pub position: u64,
56
    /// The embeded index at the time of the sequence being written. This value
57
    /// is always present on data written from v0.6.0 onwards. If the tree being
58
    /// used was created after v0.6.0 or has had compaction run on v0.6.0, it is
59
    /// safe to unwrap this value.
60
    pub embedded: Option<Embedded>,
61
}
62

            
63
impl<Embedded> BinarySerialization for BySequenceIndex<Embedded>
64
where
65
    Embedded: super::EmbeddedIndex<ArcBytes<'static>>,
66
{
67
393895
    fn serialize_to(
68
393895
        &mut self,
69
393895
        writer: &mut Vec<u8>,
70
393895
        _paged_writer: &mut PagedWriter<'_>,
71
393895
    ) -> Result<usize, Error> {
72
393895
        let mut bytes_written = 0;
73
393895
        writer.write_u32::<BigEndian>(self.value_length)?;
74
393895
        bytes_written += 4;
75
393895
        writer.write_u64::<BigEndian>(self.position)?;
76
393895
        bytes_written += 8;
77
393895
        writer.write_u64::<BigEndian>(self.last_sequence.unwrap_or(SequenceId(0)).0)?;
78
393895
        bytes_written += 8;
79

            
80
393895
        let key_length = u16::try_from(self.key.len()).map_err(|_| ErrorKind::KeyTooLarge)?;
81
393895
        writer.write_u16::<BigEndian>(key_length)?;
82
393895
        bytes_written += 2;
83
393895
        writer.extend_from_slice(&self.key);
84
393895
        bytes_written += key_length as usize;
85

            
86
393895
        if let Some(embedded) = &self.embedded {
87
377016
            bytes_written += embedded.serialize_to(writer)?;
88
16879
        }
89

            
90
393895
        Ok(bytes_written)
91
393895
    }
92

            
93
75804
    fn deserialize_from(
94
75804
        reader: &mut ArcBytes<'_>,
95
75804
        _current_order: Option<usize>,
96
75804
    ) -> Result<Self, Error> {
97
75804
        let value_length = reader.read_u32::<BigEndian>()?;
98
75804
        let position = reader.read_u64::<BigEndian>()?;
99
75804
        let last_sequence = SequenceId(reader.read_u64::<BigEndian>()?);
100
75804
        let key_length = reader.read_u16::<BigEndian>()? as usize;
101
75804
        if key_length > reader.len() {
102
            return Err(Error::data_integrity(format!(
103
                "key length {} found but only {} bytes remaining",
104
                key_length,
105
                reader.len()
106
            )));
107
75804
        }
108
75804
        let key = reader.read_bytes(key_length)?.into_owned();
109

            
110
75804
        let embedded = (!reader.is_empty())
111
75804
            .then(|| Embedded::deserialize_from(reader))
112
75804
            .transpose()?;
113

            
114
        Ok(Self {
115
75804
            key,
116
75804
            last_sequence: if last_sequence.valid() {
117
23596
                Some(last_sequence)
118
            } else {
119
52208
                None
120
            },
121
75804
            value_length,
122
75804
            position,
123
75804
            embedded,
124
        })
125
75804
    }
126
}
127

            
128
impl<Embedded> PositionIndex for BySequenceIndex<Embedded> {
129
6
    fn position(&self) -> u64 {
130
6
        self.position
131
6
    }
132
}
133

            
134
/// The reduced index of [`BySequenceIndex`].
135
173115
#[derive(Clone, Debug)]
136
pub struct BySequenceStats {
137
    /// The total number of sequence entries.
138
    pub total_sequences: u64,
139
}
140

            
141
impl BinarySerialization for BySequenceStats {
142
    fn serialize_to(
143
        &mut self,
144
        writer: &mut Vec<u8>,
145
        _paged_writer: &mut PagedWriter<'_>,
146
    ) -> Result<usize, Error> {
147
198686
        writer.write_u64::<BigEndian>(self.total_sequences)?;
148
198686
        Ok(8)
149
198686
    }
150

            
151
55016
    fn deserialize_from(
152
55016
        reader: &mut ArcBytes<'_>,
153
55016
        _current_order: Option<usize>,
154
55016
    ) -> Result<Self, Error> {
155
55016
        let number_of_records = reader.read_u64::<BigEndian>()?;
156
55016
        Ok(Self {
157
55016
            total_sequences: number_of_records,
158
55016
        })
159
55016
    }
160
}
161

            
162
#[derive(Clone, Default, Debug)]
163
pub struct BySequenceReducer;
164

            
165
impl<Embedded> Reducer<BySequenceIndex<Embedded>, BySequenceStats> for BySequenceReducer {
166
258379
    fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> BySequenceStats
167
258379
    where
168
258379
        BySequenceIndex<Embedded>: 'a,
169
258379
        Indexes: IntoIterator<Item = &'a BySequenceIndex<Embedded>, IntoIter = IndexesIter>
170
258379
            + ExactSizeIterator,
171
258379
        IndexesIter: Iterator<Item = &'a BySequenceIndex<Embedded>> + ExactSizeIterator + Clone,
172
258379
    {
173
258379
        BySequenceStats {
174
258379
            total_sequences: indexes.len() as u64,
175
258379
        }
176
258379
    }
177

            
178
415352
    fn rereduce<'a, ReducedIndexes, ReducedIndexesIter>(
179
415352
        &self,
180
415352
        values: ReducedIndexes,
181
415352
    ) -> BySequenceStats
182
415352
    where
183
415352
        Self: 'a,
184
415352
        ReducedIndexes: IntoIterator<Item = &'a BySequenceStats, IntoIter = ReducedIndexesIter>
185
415352
            + ExactSizeIterator,
186
415352
        ReducedIndexesIter: Iterator<Item = &'a BySequenceStats> + ExactSizeIterator,
187
415352
    {
188
415352
        BySequenceStats {
189
12094396
            total_sequences: values.into_iter().map(|v| v.total_sequences).sum(),
190
415352
        }
191
415352
    }
192
}