1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
use std::{
borrow::Cow,
collections::HashMap,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use lru::LruCache;
use parking_lot::{Mutex, MutexGuard};
use super::{LogEntry, TransactionHandle, TreeLock, TreeLocks};
use crate::transaction::TransactionId;
const UNINITIALIZED_ID: u64 = 0;
#[derive(Clone, Debug)]
pub struct State {
state: Arc<ActiveState>,
}
#[derive(Debug)]
struct ActiveState {
path: PathBuf,
current_transaction_id: AtomicU64,
tree_locks: Mutex<HashMap<Cow<'static, [u8]>, TreeLock>>,
log_position: Mutex<LogPosition>,
known_completed_transactions: Mutex<LruCache<TransactionId, Option<u64>>>,
}
#[derive(Debug)]
pub struct LogPosition {
pub file_offset: u64,
pub last_written_transaction: TransactionId,
}
impl Default for LogPosition {
fn default() -> Self {
Self {
file_offset: 0,
last_written_transaction: TransactionId(UNINITIALIZED_ID),
}
}
}
impl State {
pub fn from_path(path: impl AsRef<Path>) -> Self {
Self {
state: Arc::new(ActiveState {
path: path.as_ref().to_path_buf(),
tree_locks: Mutex::default(),
current_transaction_id: AtomicU64::new(UNINITIALIZED_ID),
log_position: Mutex::new(LogPosition::default()),
known_completed_transactions: Mutex::new(LruCache::new(1024)),
}),
}
}
pub(crate) fn initialize(&self, last_written_transaction: TransactionId, log_position: u64) {
let mut state_position = self.state.log_position.lock();
self.state
.current_transaction_id
.compare_exchange(
UNINITIALIZED_ID,
last_written_transaction.0 + 1,
Ordering::SeqCst,
Ordering::SeqCst,
)
.expect("state already initialized");
state_position.file_offset = log_position;
state_position.last_written_transaction = last_written_transaction;
}
#[must_use]
pub fn current_transaction_id(&self) -> Option<TransactionId> {
let position = self.state.log_position.lock();
match position.last_written_transaction {
TransactionId(UNINITIALIZED_ID) => None,
other => Some(other),
}
}
#[must_use]
pub fn next_transaction_id(&self) -> TransactionId {
TransactionId(self.state.current_transaction_id.load(Ordering::SeqCst))
}
#[must_use]
pub fn path(&self) -> &Path {
&self.state.path
}
#[must_use]
pub fn len(&self) -> u64 {
let position = self.state.log_position.lock();
position.file_offset
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn fetch_tree_locks<'a>(&self, trees: impl Iterator<Item = &'a [u8]>, locks: &mut TreeLocks) {
let mut trees = trees.collect::<Vec<_>>();
trees.sort_unstable();
let mut tree_locks = self.state.tree_locks.lock();
for tree in trees {
if let Some(lock) = tree_locks.get(&Cow::Borrowed(tree)) {
locks.push(lock.lock());
} else {
let lock = TreeLock::new();
let locked = lock.lock();
tree_locks.insert(Cow::Owned(tree.to_vec()), lock);
locks.push(locked);
}
}
}
#[must_use]
pub fn new_transaction<
'a,
I: IntoIterator<Item = &'a [u8], IntoIter = II>,
II: ExactSizeIterator<Item = &'a [u8]>,
>(
&self,
trees: I,
) -> TransactionHandle {
let trees = trees.into_iter();
let mut locked_trees = Vec::with_capacity(trees.len());
self.fetch_tree_locks(trees, &mut locked_trees);
TransactionHandle {
locked_trees,
transaction: LogEntry {
id: TransactionId(
self.state
.current_transaction_id
.fetch_add(1, Ordering::SeqCst),
),
data: None,
},
}
}
pub(crate) fn note_transaction_id_status(
&self,
transaction_id: TransactionId,
position: Option<u64>,
) {
let mut cache = self.state.known_completed_transactions.lock();
cache.put(transaction_id, position);
}
pub(crate) fn note_transaction_ids_completed(
&self,
transaction_ids: &[(TransactionId, Option<u64>)],
) {
let mut cache = self.state.known_completed_transactions.lock();
for (id, position) in transaction_ids {
cache.put(*id, *position);
}
}
#[allow(clippy::option_option)]
pub(crate) fn transaction_id_position(
&self,
transaction_id: TransactionId,
) -> Option<Option<u64>> {
let mut cache = self.state.known_completed_transactions.lock();
cache.get(&transaction_id).copied()
}
}
impl State {
pub(crate) fn lock_for_write(&self) -> MutexGuard<'_, LogPosition> {
self.state.log_position.lock()
}
}