1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use std::{
    fmt::Debug,
    ops::{Deref, DerefMut},
};

use super::btree_entry::KeyOperation;
use crate::{error::Error, transaction::TransactionId, ArcBytes, ErrorKind};

/// A tree modification.
#[derive(Debug)]
pub struct Modification<'a, T, Index> {
    /// The transaction ID to store with this change.
    pub persistence_mode: PersistenceMode,
    /// The keys to operate upon.
    pub keys: Vec<ArcBytes<'a>>,
    /// The operation to perform on the keys.
    pub operation: Operation<'a, T, Index>,
}

impl<'a, T, Index> Modification<'a, T, Index> {
    pub(crate) fn reverse(&mut self) -> Result<(), Error> {
        if self.keys.windows(2).all(|w| w[0] < w[1]) {
            self.keys.reverse();
            if let Operation::SetEach(values) = &mut self.operation {
                values.reverse();
            }
            Ok(())
        } else {
            Err(Error::from(ErrorKind::KeysNotOrdered))
        }
    }
}

/// Controls the persistence guarantees of write operations.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum PersistenceMode {
    /// Transactional writes are always fully synchronized, which means that
    /// data written is persisted to the best of the abilities provided by the
    /// operating system hosting Nebari. Additionally, the written data is
    /// tagged as belonging to this `Transactionid`.
    Transactional(TransactionId),
    /// Data written is persisted to the best of the abilities provided by the
    /// operating system hosting Nebari.
    Sync,
    /// Data written is not buffered by Nebari, but may still be in
    /// operating-system level caches. In the event of a power loss or sudden
    /// application failure, the data written may not be available when the tree
    /// is reopened. Nebari will automatically recover to the last write that
    /// was fully synchronized to disk, but writing using this persistence mode
    /// moves the control of synchronization from Nebari to the operating
    /// system.
    Flush,
}

impl PersistenceMode {
    /// Returns the transaction ID for writes performed with this mode, if applicable.
    #[must_use]
    pub const fn transaction_id(&self) -> Option<TransactionId> {
        if let Self::Transactional(id) = self {
            Some(*id)
        } else {
            None
        }
    }

    /// Returns true if writes should be fully synchronized before control is
    /// returned to the caller.
    #[must_use]
    pub const fn should_synchronize(&self) -> bool {
        matches!(self, Self::Transactional(_) | Self::Sync)
    }
}

impl From<TransactionId> for PersistenceMode {
    fn from(id: TransactionId) -> Self {
        Self::Transactional(id)
    }
}

impl From<Option<TransactionId>> for PersistenceMode {
    fn from(maybe_transactional: Option<TransactionId>) -> Self {
        maybe_transactional.map_or(Self::Sync, Self::Transactional)
    }
}

/// An operation that is performed on a set of keys.
pub enum Operation<'a, T, Index> {
    /// Sets all keys to the value.
    Set(T),
    /// Sets each key to the corresponding entry in this value. The number of
    /// keys must match the number of values.
    SetEach(Vec<T>),
    /// Removes the keys.
    Remove,
    /// Executes the `CompareSwap`. The original value (or `None` if not
    /// present) is the only argument.
    CompareSwap(CompareSwap<'a, T, Index>),
}

impl<'a, T: Debug, Index> Debug for Operation<'a, T, Index> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Set(arg0) => f.debug_tuple("Set").field(arg0).finish(),
            Self::SetEach(arg0) => f.debug_tuple("SetEach").field(arg0).finish(),
            Self::Remove => write!(f, "Remove"),
            Self::CompareSwap(_) => f.debug_tuple("CompareSwap").finish(),
        }
    }
}

/// A function that is allowed to check the current value of a key and determine
/// how to operate on it. The first parameter is the key, and the second
/// parameter is the current value, if present.
pub type CompareSwapFn<'a, T, Index> =
    dyn FnMut(&ArcBytes<'a>, Option<&Index>, Option<T>) -> KeyOperation<T> + 'a;

/// A wrapper for a [`CompareSwapFn`].
pub struct CompareSwap<'a, T, Index>(&'a mut CompareSwapFn<'a, T, Index>);

impl<'a, T, Index> CompareSwap<'a, T, Index> {
    /// Returns a new wrapped callback.
    pub fn new<F: FnMut(&ArcBytes<'_>, Option<&Index>, Option<T>) -> KeyOperation<T> + 'a>(
        callback: &'a mut F,
    ) -> Self {
        Self(callback)
    }
}

impl<'a, T, Index> Debug for CompareSwap<'a, T, Index> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str("CompareSwap(dyn FnMut)")
    }
}

impl<'a, T, Index> Deref for CompareSwap<'a, T, Index> {
    type Target = CompareSwapFn<'a, T, Index>;

    fn deref(&self) -> &Self::Target {
        self.0
    }
}

impl<'a, T, Index> DerefMut for CompareSwap<'a, T, Index> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.0
    }
}