1
use std::{
2
    ops::{Deref, DerefMut, RangeBounds},
3
    path::{Path, PathBuf},
4
    sync::{
5
        atomic::{AtomicBool, Ordering},
6
        Arc,
7
    },
8
};
9

            
10
use parking_lot::Mutex;
11

            
12
use super::{log::EntryFetcher, LogEntry, State, TransactionLog};
13
use crate::{
14
    error::{Error, InternalError},
15
    io::{FileManager, OperableFile},
16
    transaction::{log::ScanResult, TransactionId},
17
    Context, ErrorKind,
18
};
19

            
20
/// A shared [`TransactionLog`] manager. Allows multiple threads to interact with a single transaction log.
21
98343
#[derive(Debug, Clone)]
22
pub struct TransactionManager<Manager: FileManager> {
23
    state: State,
24
    transaction_sender: flume::Sender<ThreadCommand>,
25
    context: Context<Manager>,
26
}
27

            
28
impl<Manager> TransactionManager<Manager>
29
where
30
    Manager: FileManager,
31
{
32
    /// Spawns a new transaction manager. The transaction manager runs its own
33
    /// thread that writes to the transaction log.
34
26
    pub fn spawn(directory: &Path, context: Context<Manager>) -> Result<Self, Error> {
35
26
        let (transaction_sender, receiver) = flume::bounded(32);
36
26
        let log_path = Self::log_path(directory);
37
26

            
38
26
        let (state_sender, state_receiver) = flume::bounded(1);
39
26
        let thread_context = context.clone();
40
26
        std::thread::Builder::new()
41
26
            .name(String::from("nebari-txlog"))
42
26
            .spawn(move || {
43
26
                ManagerThread::<Manager>::run(&state_sender, &log_path, receiver, thread_context);
44
26
            })
45
26
            .map_err(ErrorKind::message)?;
46

            
47
26
        let state = state_receiver.recv().expect("failed to initialize")?;
48
26
        Ok(Self {
49
26
            state,
50
26
            transaction_sender,
51
26
            context,
52
26
        })
53
26
    }
54

            
55
    /// Creates a new transaction, exclusively locking `trees`. Will block the thread until the trees can be locked.
56
    #[must_use]
57
98293
    pub fn new_transaction<
58
98293
        'a,
59
98293
        I: IntoIterator<Item = &'a [u8], IntoIter = II>,
60
98293
        II: ExactSizeIterator<Item = &'a [u8]>,
61
98293
    >(
62
98293
        &self,
63
98293
        trees: I,
64
98293
    ) -> ManagedTransaction<Manager> {
65
98293
        ManagedTransaction {
66
98293
            transaction: Some(self.state.new_transaction(trees)),
67
98293
            manager: self.clone(),
68
98293
        }
69
98293
    }
70

            
71
    /// Push `transaction` to the log. Once this function returns, the
72
    /// transaction log entry has been fully flushed to disk.
73
89712
    fn push(&self, transaction: TransactionHandle) -> Result<TreeLocks, Error> {
74
89712
        let (completion_sender, completion_receiver) = flume::bounded(1);
75
89712
        self.transaction_sender
76
89712
            .send(ThreadCommand::Commit {
77
89712
                transaction,
78
89712
                completion_sender,
79
89712
            })
80
89712
            .map_err(|_| ErrorKind::Internal(InternalError::TransactionManagerStopped))?;
81
89712
        completion_receiver.recv().map_err(|_| {
82
            Error::from(ErrorKind::Internal(
83
                InternalError::TransactionManagerStopped,
84
            ))
85
89712
        })
86
89712
    }
87

            
88
    /// Scans the transaction log for entries with ids within `range`. Invokes
89
    /// `callback` for each entry found. The scan will always scan forwards
90
    /// starting with the lowest ID matching the range.
91
405
    pub fn scan<Callback: FnMut(LogEntry<'static>) -> bool>(
92
405
        &self,
93
405
        range: impl RangeBounds<TransactionId>,
94
405
        callback: Callback,
95
405
    ) -> Result<(), Error> {
96
405
        let mut log = TransactionLog::<Manager::File>::read(
97
405
            self.state.path(),
98
405
            self.state.clone(),
99
405
            self.context.clone(),
100
405
        )?;
101
405
        log.scan(range, callback)
102
405
    }
103

            
104
    /// Returns true if the transaction id was recorded in the transaction log. This method caches
105
13
    pub fn transaction_was_successful(&self, transaction_id: TransactionId) -> Result<bool, Error> {
106
13
        self.transaction_position(transaction_id)
107
13
            .map(|position| position.is_some())
108
13
    }
109

            
110
    /// Returns the location on disk of the transaction, if found.
111
13
    pub fn transaction_position(
112
13
        &self,
113
13
        transaction_id: TransactionId,
114
13
    ) -> Result<Option<u64>, Error> {
115
13
        if !transaction_id.valid() {
116
            Ok(None)
117
13
        } else if let Some(position) = self.state.transaction_id_position(transaction_id) {
118
5
            Ok(position)
119
        } else {
120
8
            let mut log = self.context.file_manager.read(self.state.path())?;
121
8
            let transaction = log.execute(EntryFetcher {
122
8
                state: self.state(),
123
8
                id: transaction_id,
124
8
                vault: self.context.vault(),
125
8
            })?;
126
8
            match transaction {
127
2
                ScanResult::Found { position, .. } => {
128
2
                    self.state
129
2
                        .note_transaction_id_status(transaction_id, Some(position));
130
2
                    Ok(Some(position))
131
                }
132
                ScanResult::NotFound { .. } => {
133
6
                    self.state.note_transaction_id_status(transaction_id, None);
134
6
                    Ok(None)
135
                }
136
            }
137
        }
138
13
    }
139

            
140
8581
    pub(crate) fn drop_transaction_id(&self, transaction_id: TransactionId) {
141
8581
        drop(
142
8581
            self.transaction_sender
143
8581
                .send(ThreadCommand::Drop(transaction_id)),
144
8581
        );
145
8581
    }
146

            
147
26
    fn log_path(directory: &Path) -> PathBuf {
148
26
        directory.join("_transactions")
149
26
    }
150

            
151
    /// Returns the current state of the transaction log.
152
    #[must_use]
153
8
    pub fn state(&self) -> &State {
154
8
        self
155
8
    }
156
}
157

            
158
impl<Manager: FileManager> Deref for TransactionManager<Manager> {
159
    type Target = State;
160

            
161
43
    fn deref(&self) -> &Self::Target {
162
43
        &self.state
163
43
    }
164
}
165

            
166
enum ThreadCommand {
167
    Commit {
168
        transaction: TransactionHandle,
169
        completion_sender: flume::Sender<TreeLocks>,
170
    },
171
    Drop(TransactionId),
172
}
173

            
174
struct ManagerThread<Manager: FileManager> {
175
    state: ThreadState,
176
    commands: flume::Receiver<ThreadCommand>,
177
    log: TransactionLog<Manager::File>,
178
    pending_transaction_ids: IdSequence,
179
    last_processed_id: TransactionId,
180
    transaction_batch: Vec<LogEntry<'static>>,
181
    completion_senders: Vec<(flume::Sender<Vec<TreeLockHandle>>, Vec<TreeLockHandle>)>,
182
}
183

            
184
enum ThreadState {
185
    Fresh,
186
    Batching,
187
    EnsuringSequence,
188
}
189

            
190
impl<Manager: FileManager> ManagerThread<Manager> {
191
    const BATCH: usize = 16;
192

            
193
26
    fn run(
194
26
        state_sender: &flume::Sender<Result<State, Error>>,
195
26
        log_path: &Path,
196
26
        transactions: flume::Receiver<ThreadCommand>,
197
26
        context: Context<Manager>,
198
26
    ) {
199
26
        let state = State::from_path(log_path);
200

            
201
26
        let log = match TransactionLog::<Manager::File>::initialize_state(&state, &context)
202
26
            .and_then(|_| TransactionLog::<Manager::File>::open(log_path, state.clone(), context))
203
        {
204
26
            Ok(log) => log,
205
            Err(err) => {
206
                drop(state_sender.send(Err(err)));
207
                return;
208
            }
209
        };
210
26
        let transaction_id = log.state().next_transaction_id();
211
26
        drop(state_sender.send(Ok(state)));
212
26

            
213
26
        Self {
214
26
            state: ThreadState::Fresh,
215
26
            commands: transactions,
216
26
            last_processed_id: transaction_id,
217
26
            pending_transaction_ids: IdSequence::new(transaction_id),
218
26
            log,
219
26
            transaction_batch: Vec::with_capacity(Self::BATCH),
220
26
            completion_senders: Vec::with_capacity(Self::BATCH),
221
26
        }
222
26
        .save_transactions();
223
26
    }
224

            
225
26
    fn save_transactions(mut self) {
226
135147
        while self.process_next_command() {}
227
26
    }
228

            
229
135147
    fn process_next_command(&mut self) -> bool {
230
135147
        match self.state {
231
40841
            ThreadState::Fresh => self.process_next_command_fresh(),
232
74363
            ThreadState::Batching => self.process_next_command_batching(),
233
19943
            ThreadState::EnsuringSequence => self.process_next_command_ensuring_sequence(),
234
        }
235
135147
    }
236

            
237
40841
    fn process_next_command_fresh(&mut self) -> bool {
238
40841
        match self.commands.recv() {
239
40815
            Ok(command) => {
240
40815
                match command {
241
                    ThreadCommand::Commit {
242
                        transaction:
243
                            TransactionHandle {
244
37298
                                transaction,
245
37298
                                locked_trees,
246
37298
                            },
247
37298
                        completion_sender,
248
37298
                    } => {
249
37298
                        self.pending_transaction_ids.note(transaction.id);
250
37298
                        if self.pending_transaction_ids.complete() {
251
36880
                            // Safe to start a new batch
252
36880
                            self.last_processed_id = transaction.id;
253
36880
                            self.state = ThreadState::Batching;
254
36880
                        } else {
255
418
                            // Need to wait for IDs
256
418
                            self.state = ThreadState::EnsuringSequence;
257
418
                        }
258

            
259
37298
                        self.transaction_batch.push(transaction);
260
37298
                        self.completion_senders
261
37298
                            .push((completion_sender, locked_trees));
262
                    }
263
3517
                    ThreadCommand::Drop(id) => {
264
3517
                        self.mark_transaction_handled(id);
265
3517
                    }
266
                }
267
40815
                true
268
            }
269
26
            Err(_) => false,
270
        }
271
40841
    }
272

            
273
23460
    fn mark_transaction_handled(&mut self, id: TransactionId) {
274
23460
        self.pending_transaction_ids.note(id);
275
23460
        if self.pending_transaction_ids.complete() && !self.transaction_batch.is_empty() {
276
470
            self.commit_transaction_batch();
277
22990
        }
278
23460
    }
279

            
280
    fn process_next_command_batching(&mut self) -> bool {
281
74363
        match self.commands.try_recv() {
282
37535
            Ok(command) => {
283
37535
                match command {
284
                    ThreadCommand::Commit {
285
                        transaction:
286
                            TransactionHandle {
287
37534
                                transaction,
288
37534
                                locked_trees,
289
37534
                            },
290
37534
                        completion_sender,
291
37534
                    } => {
292
37534
                        // Ensure this transaction can be batched. If not,
293
37534
                        // commit and enqueue it.
294
37534
                        self.note_potentially_sequntial_id(transaction.id);
295
37534
                        self.transaction_batch.push(transaction);
296
37534
                        self.completion_senders
297
37534
                            .push((completion_sender, locked_trees));
298
37534
                    }
299
1
                    ThreadCommand::Drop(id) => {
300
1
                        self.note_potentially_sequntial_id(id);
301
1
                    }
302
                }
303
37535
                true
304
            }
305
            Err(flume::TryRecvError::Empty) => {
306
                // No more pending transactions are ready.
307
36828
                self.commit_transaction_batch();
308
36828
                true
309
            }
310
            Err(flume::TryRecvError::Disconnected) => false,
311
        }
312
74363
    }
313

            
314
37535
    fn note_potentially_sequntial_id(&mut self, id: TransactionId) {
315
37535
        self.pending_transaction_ids.note(id);
316
37535
        if self.pending_transaction_ids.complete() {
317
37483
            // Safe to start a new batch
318
37483
            self.last_processed_id = id;
319
37483
            self.state = ThreadState::Batching;
320
37483
        } else {
321
52
            if !self.transaction_batch.is_empty() {
322
52
                self.commit_transaction_batch();
323
52
            }
324
52
            self.state = ThreadState::EnsuringSequence;
325
        }
326
37535
    }
327

            
328
19943
    fn process_next_command_ensuring_sequence(&mut self) -> bool {
329
19943
        match self.commands.recv() {
330
19943
            Ok(command) => {
331
19943
                match command {
332
                    ThreadCommand::Commit {
333
                        transaction:
334
                            TransactionHandle {
335
14880
                                transaction,
336
14880
                                locked_trees,
337
14880
                            },
338
14880
                        completion_sender,
339
14880
                    } => {
340
14880
                        let transaction_id = transaction.id;
341
14880
                        self.transaction_batch.push(transaction);
342
14880
                        self.completion_senders
343
14880
                            .push((completion_sender, locked_trees));
344
14880
                        self.mark_transaction_handled(transaction_id);
345
14880
                    }
346
5063
                    ThreadCommand::Drop(id) => {
347
5063
                        self.mark_transaction_handled(id);
348
5063
                    }
349
                }
350
19943
                true
351
            }
352
            Err(_) => false,
353
        }
354
19943
    }
355

            
356
37350
    fn commit_transaction_batch(&mut self) {
357
37350
        let mut transaction_batch = Vec::with_capacity(Self::BATCH);
358
37350
        std::mem::swap(&mut transaction_batch, &mut self.transaction_batch);
359
129970
        transaction_batch.sort_unstable_by(|a, b| a.id.cmp(&b.id));
360
37350
        self.last_processed_id = transaction_batch.last().unwrap().id;
361
37350
        self.state = ThreadState::Fresh;
362
37350
        self.log.push(transaction_batch).unwrap();
363
89712
        for (completion_sender, tree_locks) in self.completion_senders.drain(..) {
364
89712
            drop(completion_sender.send(tree_locks));
365
89712
        }
366
37350
    }
367
}
368

            
369
/// A transaction that is managed by a [`TransactionManager`].
370
pub struct ManagedTransaction<Manager: FileManager> {
371
    pub(crate) manager: TransactionManager<Manager>,
372
    pub(crate) transaction: Option<TransactionHandle>,
373
}
374

            
375
impl<Manager: FileManager> Drop for ManagedTransaction<Manager> {
376
    fn drop(&mut self) {
377
98293
        if let Some(transaction) = self.transaction.take() {
378
8581
            self.manager.drop_transaction_id(transaction.id);
379
89712
        }
380
98293
    }
381
}
382
impl<Manager: FileManager> ManagedTransaction<Manager> {
383
    /// Commits the transaction to the transaction manager that created this
384
    /// transaction.
385
    #[allow(clippy::missing_panics_doc)] // Should be unreachable
386
89712
    pub fn commit(mut self) -> Result<TreeLocks, Error> {
387
89712
        let transaction = self.transaction.take().unwrap();
388
89712
        self.manager.push(transaction)
389
89712
    }
390

            
391
    /// Rolls the transaction back. It is not necessary to call this function --
392
    /// transactions will automatically be rolled back when the handle is
393
    /// dropped, if `commit()` isn't called first.
394
5080
    pub fn rollback(self) {
395
5080
        drop(self);
396
5080
    }
397
}
398

            
399
impl<Manager: FileManager> Deref for ManagedTransaction<Manager> {
400
    type Target = LogEntry<'static>;
401

            
402
59915
    fn deref(&self) -> &Self::Target {
403
59915
        self.transaction.as_ref().unwrap()
404
59915
    }
405
}
406

            
407
impl<Manager: FileManager> DerefMut for ManagedTransaction<Manager> {
408
    fn deref_mut(&mut self) -> &mut Self::Target {
409
        &mut self.transaction.as_mut().unwrap().transaction
410
    }
411
}
412

            
413
/// A handle to an executing transaction.
414
pub struct TransactionHandle {
415
    /// The transaction being executed.
416
    pub(crate) transaction: LogEntry<'static>,
417
    /// The trees locked by this transaction.
418
    pub(crate) locked_trees: TreeLocks,
419
}
420

            
421
/// A collection of handles that keep trees locked.
422
pub type TreeLocks = Vec<TreeLockHandle>;
423

            
424
/// An acquirable lock for a tree.
425
#[derive(Debug)]
426
pub struct TreeLock {
427
    data: Arc<TreeLockData>,
428
}
429

            
430
impl TreeLock {
431
11429
    pub(crate) fn new() -> Self {
432
11429
        Self {
433
11429
            data: Arc::new(TreeLockData {
434
11429
                locked: AtomicBool::new(false),
435
11429
                blocked: Mutex::default(),
436
11429
            }),
437
11429
        }
438
11429
    }
439

            
440
114305
    pub(crate) fn lock(&self) -> TreeLockHandle {
441
        // Loop until we acquire a lock
442
        loop {
443
            // Try to acquire the lock without any possibility of blocking
444
146129
            if self
445
146129
                .data
446
146129
                .locked
447
146129
                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
448
146129
                .is_ok()
449
            {
450
113864
                break;
451
32265
            }
452

            
453
31824
            let unblocked_receiver = {
454
32265
                let mut blocked = self.data.blocked.lock();
455
32265
                // Now that we've acquired this lock, it's possible the lock has
456
32265
                // been released. If there are no others waiting, we can re-lock
457
32265
                // it. If there werealready others waiting, we want to allow
458
32265
                // them to have a chance to wake up first, so we assume that the
459
32265
                // lock is locked without checking.
460
32265
                if blocked.is_empty()
461
32265
                    && self
462
32265
                        .data
463
32265
                        .locked
464
32265
                        .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
465
32265
                        .is_ok()
466
                {
467
441
                    break;
468
31824
                }
469
31824

            
470
31824
                // Add a new sender to the blocked list, and return it so that
471
31824
                // we can wait for it to be signalled.
472
31824
                let (unblocked_sender, unblocked_receiver) = flume::bounded(1);
473
31824
                blocked.push(unblocked_sender);
474
31824
                unblocked_receiver
475
31824
            };
476
31824
            // Wait for our unblocked signal to be triggered before trying to acquire the lock again.
477
31824
            let _ = unblocked_receiver.recv();
478
        }
479

            
480
114305
        TreeLockHandle(Self {
481
114305
            data: self.data.clone(),
482
114305
        })
483
114305
    }
484
}
485

            
486
#[derive(Debug)]
487
struct TreeLockData {
488
    locked: AtomicBool,
489
    blocked: Mutex<Vec<flume::Sender<()>>>,
490
}
491

            
492
/// A handle to a lock. Upon dropping, the lock will be released.
493
#[derive(Debug)]
494
pub struct TreeLockHandle(TreeLock);
495

            
496
impl Drop for TreeLockHandle {
497
114305
    fn drop(&mut self) {
498
114305
        self.0.data.locked.store(false, Ordering::SeqCst);
499
114305

            
500
114305
        let data = self.0.data.clone();
501
114305
        let mut blocked = data.blocked.lock();
502
114305
        for blocked in blocked.drain(..) {
503
31824
            let _ = blocked.send(());
504
31824
        }
505
114305
    }
506
}
507

            
508
impl Deref for TransactionHandle {
509
    type Target = LogEntry<'static>;
510

            
511
84193
    fn deref(&self) -> &Self::Target {
512
84193
        &self.transaction
513
84193
    }
514
}
515

            
516
impl DerefMut for TransactionHandle {
517
6001
    fn deref_mut(&mut self) -> &mut Self::Target {
518
6001
        &mut self.transaction
519
6001
    }
520
}
521

            
522
struct IdSequence {
523
    start: u64,
524
    length: u64,
525
    statuses: Vec<usize>,
526
}
527

            
528
impl IdSequence {
529
29
    pub const fn new(start: TransactionId) -> Self {
530
29
        Self {
531
29
            start: start.0,
532
29
            length: 0,
533
29
            statuses: Vec::new(),
534
29
        }
535
29
    }
536

            
537
99387
    pub fn note(&mut self, id: TransactionId) {
538
99387
        self.length = ((id.0 + 1).checked_sub(self.start).unwrap()).max(self.length);
539
99387
        let offset = usize::try_from(id.0.checked_sub(self.start).unwrap()).unwrap();
540
99387
        let index = offset / (usize::BITS as usize);
541
99387
        if self.statuses.len() < index + 1 {
542
78472
            self.statuses.resize(index + 1, 0);
543
78472
        }
544

            
545
99387
        let bit_offset = offset % (usize::BITS as usize);
546
99387
        self.statuses[index] |= 1 << bit_offset;
547
99387

            
548
99387
        self.truncate();
549
99387
    }
550

            
551
99387
    pub const fn complete(&self) -> bool {
552
99387
        self.length == 0
553
99387
    }
554

            
555
99387
    fn truncate(&mut self) {
556
177895
        while self.length > 0 {
557
99547
            let mask_bits = usize::try_from(self.length).unwrap();
558
99547
            let mask_bits = mask_bits.min(usize::BITS as usize);
559
99547
            let mask = usize::MAX >> (usize::BITS as usize - mask_bits);
560
99547
            if self.statuses[0] & mask == mask {
561
78508
                self.statuses.remove(0);
562
78508
                let mask_bits = u64::try_from(mask_bits).unwrap();
563
78508
                self.start += mask_bits;
564
78508
                self.length -= mask_bits;
565
78508
            } else {
566
21039
                break;
567
            }
568
        }
569
99387
    }
570
}
571

            
572
1
#[test]
573
1
fn id_sequence_tests() {
574
1
    let mut seq = IdSequence::new(TransactionId(1));
575
1
    seq.note(TransactionId(3));
576
1
    assert!(!seq.complete());
577
1
    seq.note(TransactionId(1));
578
1
    assert!(!seq.complete());
579
1
    seq.note(TransactionId(2));
580
1
    assert!(seq.complete());
581
1
    seq.note(TransactionId(4));
582
1
    assert!(seq.complete());
583

            
584
1
    let mut seq = IdSequence::new(TransactionId(0));
585
66
    for id in (0..=65).rev() {
586
66
        seq.note(TransactionId(id));
587
66
        assert_eq!(id == 0, seq.complete());
588
    }
589

            
590
1
    let mut seq = IdSequence::new(TransactionId(1));
591
1024
    for id in (1..=1024).rev() {
592
1024
        seq.note(TransactionId(id));
593
1024
        assert_eq!(id == 1, seq.complete());
594
    }
595
1
}