1
use std::{
2
    borrow::Cow,
3
    collections::HashMap,
4
    path::{Path, PathBuf},
5
    sync::{
6
        atomic::{AtomicU64, Ordering},
7
        Arc,
8
    },
9
};
10

            
11
use lru::LruCache;
12
use parking_lot::{Mutex, MutexGuard};
13

            
14
use super::{LogEntry, TransactionHandle, TreeLock, TreeLocks};
15
use crate::transaction::TransactionId;
16

            
17
const UNINITIALIZED_ID: u64 = 0;
18

            
19
/// The transaction log state.
20
151833
#[derive(Clone, Debug)]
21
pub struct State {
22
    state: Arc<ActiveState>,
23
}
24

            
25
#[derive(Debug)]
26
struct ActiveState {
27
    path: PathBuf,
28
    current_transaction_id: AtomicU64,
29
    tree_locks: Mutex<HashMap<Cow<'static, [u8]>, TreeLock>>,
30
    log_position: Mutex<LogPosition>,
31
    known_completed_transactions: Mutex<LruCache<TransactionId, Option<u64>>>,
32
}
33

            
34
/// The active log position information.
35
#[derive(Debug)]
36
pub struct LogPosition {
37
    /// The offset of the writer within the file.
38
    pub file_offset: u64,
39
    /// The last successfully written transaction id.
40
    pub last_written_transaction: TransactionId,
41
}
42

            
43
impl Default for LogPosition {
44
6033
    fn default() -> Self {
45
6033
        Self {
46
6033
            file_offset: 0,
47
6033
            last_written_transaction: TransactionId(UNINITIALIZED_ID),
48
6033
        }
49
6033
    }
50
}
51

            
52
impl State {
53
    /// Creates a new uninitialized state for a transaction log located at `path`.
54
6033
    pub fn from_path(path: impl AsRef<Path>) -> Self {
55
6033
        Self {
56
6033
            state: Arc::new(ActiveState {
57
6033
                path: path.as_ref().to_path_buf(),
58
6033
                tree_locks: Mutex::default(),
59
6033
                current_transaction_id: AtomicU64::new(UNINITIALIZED_ID),
60
6033
                log_position: Mutex::new(LogPosition::default()),
61
6033
                known_completed_transactions: Mutex::new(LruCache::new(1024)),
62
6033
            }),
63
6033
        }
64
6033
    }
65

            
66
6033
    pub(crate) fn initialize(&self, last_written_transaction: TransactionId, log_position: u64) {
67
6033
        let mut state_position = self.state.log_position.lock();
68
6033
        self.state
69
6033
            .current_transaction_id
70
6033
            .compare_exchange(
71
6033
                UNINITIALIZED_ID,
72
6033
                last_written_transaction.0 + 1,
73
6033
                Ordering::SeqCst,
74
6033
                Ordering::SeqCst,
75
6033
            )
76
6033
            .expect("state already initialized");
77
6033
        state_position.file_offset = log_position;
78
6033
        state_position.last_written_transaction = last_written_transaction;
79
6033
    }
80

            
81
    /// Returns the last successfully written transaction id, or None if no
82
    /// transactions have been recorded yet.
83
    #[must_use]
84
15
    pub fn current_transaction_id(&self) -> Option<TransactionId> {
85
15
        let position = self.state.log_position.lock();
86
15
        match position.last_written_transaction {
87
5
            TransactionId(UNINITIALIZED_ID) => None,
88
10
            other => Some(other),
89
        }
90
15
    }
91

            
92
    /// Returns the next transaction id that will be used.
93
    #[must_use]
94
32461
    pub fn next_transaction_id(&self) -> TransactionId {
95
32461
        TransactionId(self.state.current_transaction_id.load(Ordering::SeqCst))
96
32461
    }
97

            
98
    /// Returns the path to the file.
99
    #[must_use]
100
12456
    pub fn path(&self) -> &Path {
101
12456
        &self.state.path
102
12456
    }
103

            
104
    /// Returns the current length of the log.
105
    #[must_use]
106
16435
    pub fn len(&self) -> u64 {
107
16435
        let position = self.state.log_position.lock();
108
16435
        position.file_offset
109
16435
    }
110

            
111
    /// Returns if the log is empty.
112
    #[must_use]
113
5
    pub fn is_empty(&self) -> bool {
114
5
        self.len() == 0
115
5
    }
116

            
117
114305
    fn fetch_tree_locks<'a>(&self, trees: impl Iterator<Item = &'a [u8]>, locks: &mut TreeLocks) {
118
114305
        // Sort the trees being locked to ensure no deadlocks can happen. For
119
114305
        // example, if writer a tries to lock (a, b) and writer b tries to lock
120
114305
        // (b, a), and both acquire their first lock, they would deadlock. By
121
114305
        // sorting, the order of locking will never have dependencies that
122
114305
        // cannot be met by blocking.
123
114305
        let mut trees = trees.collect::<Vec<_>>();
124
114305
        trees.sort_unstable();
125
114305
        let mut tree_locks = self.state.tree_locks.lock();
126
228610
        for tree in trees {
127
114305
            if let Some(lock) = tree_locks.get(&Cow::Borrowed(tree)) {
128
102876
                locks.push(lock.lock());
129
102876
            } else {
130
11429
                let lock = TreeLock::new();
131
11429
                let locked = lock.lock();
132
11429
                tree_locks.insert(Cow::Owned(tree.to_vec()), lock);
133
11429
                locks.push(locked);
134
11429
            }
135
        }
136
114305
    }
137

            
138
    /// Creates a new transaction, exclusively locking `trees`. Will block the thread until the trees can be locked.
139
    #[must_use]
140
114305
    pub fn new_transaction<
141
114305
        'a,
142
114305
        I: IntoIterator<Item = &'a [u8], IntoIter = II>,
143
114305
        II: ExactSizeIterator<Item = &'a [u8]>,
144
114305
    >(
145
114305
        &self,
146
114305
        trees: I,
147
114305
    ) -> TransactionHandle {
148
114305
        let trees = trees.into_iter();
149
114305
        let mut locked_trees = Vec::with_capacity(trees.len());
150
114305
        self.fetch_tree_locks(trees, &mut locked_trees);
151
114305

            
152
114305
        TransactionHandle {
153
114305
            locked_trees,
154
114305
            transaction: LogEntry {
155
114305
                id: TransactionId(
156
114305
                    self.state
157
114305
                        .current_transaction_id
158
114305
                        .fetch_add(1, Ordering::SeqCst),
159
114305
                ),
160
114305
                data: None,
161
114305
            },
162
114305
        }
163
114305
    }
164

            
165
422984
    pub(crate) fn note_transaction_id_status(
166
422984
        &self,
167
422984
        transaction_id: TransactionId,
168
422984
        position: Option<u64>,
169
422984
    ) {
170
422984
        let mut cache = self.state.known_completed_transactions.lock();
171
422984
        cache.put(transaction_id, position);
172
422984
    }
173

            
174
53053
    pub(crate) fn note_transaction_ids_completed(
175
53053
        &self,
176
53053
        transaction_ids: &[(TransactionId, Option<u64>)],
177
53053
    ) {
178
53053
        let mut cache = self.state.known_completed_transactions.lock();
179
158468
        for (id, position) in transaction_ids {
180
105415
            cache.put(*id, *position);
181
105415
        }
182
53053
    }
183

            
184
    /// Returns an option representing whether the transaction id has
185
    /// information cached about it. The inner option contains the cache
186
    /// contents: either a valid position, or None if the transaction ID
187
    /// couldn't be found when it was last searched for.
188
    #[allow(clippy::option_option)]
189
13
    pub(crate) fn transaction_id_position(
190
13
        &self,
191
13
        transaction_id: TransactionId,
192
13
    ) -> Option<Option<u64>> {
193
13
        let mut cache = self.state.known_completed_transactions.lock();
194
13
        cache.get(&transaction_id).copied()
195
13
    }
196
}
197

            
198
impl State {
199
53059
    pub(crate) fn lock_for_write(&self) -> MutexGuard<'_, LogPosition> {
200
53059
        self.state.log_position.lock()
201
53059
    }
202
}