1
use std::{
2
    fmt::Debug,
3
    ops::{Deref, DerefMut},
4
};
5

            
6
use super::btree::KeyOperation;
7
use crate::{error::Error, transaction::TransactionId, ArcBytes, ErrorKind};
8

            
9
/// A tree modification.
10
#[derive(Debug)]
11
pub struct Modification<'a, T, Index> {
12
    /// The transaction ID to store with this change.
13
    pub persistence_mode: PersistenceMode,
14
    /// The keys to operate upon.
15
    pub keys: Vec<ArcBytes<'a>>,
16
    /// The operation to perform on the keys.
17
    pub operation: Operation<'a, T, Index>,
18
}
19

            
20
impl<'a, T, Index> Modification<'a, T, Index> {
21
    /// Prepares this modification for efficient operation, and ensures that the
22
    /// keys are properly ordered.
23
    ///
24
    /// After calling this function, the keys and values (if applicable) are
25
    /// reversed so that keys and values can be removed by calling [`Vec::pop`].
26
60519
    pub fn prepare(&mut self) -> Result<(), Error> {
27
637059
        if self.keys.windows(2).all(|w| w[0] < w[1]) {
28
60519
            self.keys.reverse();
29
60519
            if let Operation::SetEach(values) = &mut self.operation {
30
20453
                values.reverse();
31
40066
            }
32
60519
            Ok(())
33
        } else {
34
            Err(Error::from(ErrorKind::KeysNotOrdered))
35
        }
36
60519
    }
37
}
38

            
39
/// Controls the persistence guarantees of write operations.
40
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
41
pub enum PersistenceMode {
42
    /// Transactional writes are always fully synchronized, which means that
43
    /// data written is persisted to the best of the abilities provided by the
44
    /// operating system hosting Nebari. Additionally, the written data is
45
    /// tagged as belonging to this `Transactionid`.
46
    Transactional(TransactionId),
47
    /// Data written is persisted to the best of the abilities provided by the
48
    /// operating system hosting Nebari.
49
    Sync,
50
    /// Data written is not buffered by Nebari, but may still be in
51
    /// operating-system level caches. In the event of a power loss or sudden
52
    /// application failure, the data written may not be available when the tree
53
    /// is reopened. Nebari will automatically recover to the last write that
54
    /// was fully synchronized to disk, but writing using this persistence mode
55
    /// moves the control of synchronization from Nebari to the operating
56
    /// system.
57
    Flush,
58
}
59

            
60
impl PersistenceMode {
61
    /// Returns the transaction ID for writes performed with this mode, if applicable.
62
    #[must_use]
63
    pub const fn transaction_id(&self) -> Option<TransactionId> {
64
80132
        if let Self::Transactional(id) = self {
65
49188
            Some(*id)
66
        } else {
67
30944
            None
68
        }
69
80132
    }
70

            
71
    /// Returns true if writes should be fully synchronized before control is
72
    /// returned to the caller.
73
    #[must_use]
74
15472
    pub const fn should_synchronize(&self) -> bool {
75
15472
        matches!(self, Self::Transactional(_) | Self::Sync)
76
15472
    }
77
}
78

            
79
impl From<TransactionId> for PersistenceMode {
80
8192
    fn from(id: TransactionId) -> Self {
81
8192
        Self::Transactional(id)
82
8192
    }
83
}
84

            
85
impl From<Option<TransactionId>> for PersistenceMode {
86
5452
    fn from(maybe_transactional: Option<TransactionId>) -> Self {
87
5452
        maybe_transactional.map_or(Self::Sync, Self::Transactional)
88
5452
    }
89
}
90

            
91
/// An operation that is performed on a set of keys.
92
pub enum Operation<'a, T, Index> {
93
    /// Sets all keys to the value.
94
    Set(T),
95
    /// Sets each key to the corresponding entry in this value. The number of
96
    /// keys must match the number of values.
97
    SetEach(Vec<T>),
98
    /// Removes the keys.
99
    Remove,
100
    /// Executes the `CompareSwap`. The original value (or `None` if not
101
    /// present) is the only argument.
102
    CompareSwap(CompareSwap<'a, T, Index>),
103
}
104

            
105
impl<'a, T: Debug, Index> Debug for Operation<'a, T, Index> {
106
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107
        match self {
108
            Self::Set(arg0) => f.debug_tuple("Set").field(arg0).finish(),
109
            Self::SetEach(arg0) => f.debug_tuple("SetEach").field(arg0).finish(),
110
            Self::Remove => write!(f, "Remove"),
111
            Self::CompareSwap(_) => f.debug_tuple("CompareSwap").finish(),
112
        }
113
    }
114
}
115

            
116
/// A function that is allowed to check the current value of a key and determine
117
/// how to operate on it. The first parameter is the key, and the second
118
/// parameter is the current value, if present.
119
pub type CompareSwapFn<'a, T, Index> =
120
    dyn FnMut(&ArcBytes<'a>, Option<&Index>, Option<T>) -> KeyOperation<T> + 'a;
121

            
122
/// A wrapper for a [`CompareSwapFn`].
123
pub struct CompareSwap<'a, T, Index>(&'a mut CompareSwapFn<'a, T, Index>);
124

            
125
impl<'a, T, Index> CompareSwap<'a, T, Index> {
126
    /// Returns a new wrapped callback.
127
8260
    pub fn new<F: FnMut(&ArcBytes<'_>, Option<&Index>, Option<T>) -> KeyOperation<T> + 'a>(
128
8260
        callback: &'a mut F,
129
8260
    ) -> Self {
130
8260
        Self(callback)
131
8260
    }
132
}
133

            
134
impl<'a, T, Index> Debug for CompareSwap<'a, T, Index> {
135
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136
        f.write_str("CompareSwap(dyn FnMut)")
137
    }
138
}
139

            
140
impl<'a, T, Index> Deref for CompareSwap<'a, T, Index> {
141
    type Target = CompareSwapFn<'a, T, Index>;
142

            
143
    fn deref(&self) -> &Self::Target {
144
        self.0
145
    }
146
}
147

            
148
impl<'a, T, Index> DerefMut for CompareSwap<'a, T, Index> {
149
360966
    fn deref_mut(&mut self) -> &mut Self::Target {
150
360966
        self.0
151
360966
    }
152
}