1
//! IO abstractions for Nebari.
2
//!
3
//! Nebari was written to have the flexibility of multiple backend options. This
4
//! may allow Nebari to target `no_std` in the future or allow for other IO
5
//! strategies to be implemented in addition to the ones seen here today.
6

            
7
use std::{
8
    borrow::Borrow,
9
    collections::HashSet,
10
    fmt::Debug,
11
    hash::Hash,
12
    io::{Read, Seek, Write},
13
    path::{Path, PathBuf},
14
    sync::{
15
        atomic::{AtomicU64, Ordering},
16
        Arc,
17
    },
18
};
19

            
20
use parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard};
21

            
22
use crate::error::Error;
23

            
24
/// A wrapper type for any file type.
25
pub mod any;
26
/// Filesystem IO provided by `std::fs`.
27
pub mod fs;
28
/// A virtual memory-based filesystem.
29
pub mod memory;
30

            
31
/// A file that is managed by a [`FileManager`].
32
pub trait ManagedFile: File {
33
    /// The file manager that synchronizes file access across threads.
34
    type Manager: FileManager<File = Self>;
35
}
36

            
37
/// A generic file trait.
38
pub trait File: Debug + Send + Sync + Seek + Read + Write + 'static {
39
    /// Returns the unique ID of this file. Only unique within the manager it
40
    /// was opened from.
41
    fn id(&self) -> &PathId;
42

            
43
    /// Returns the length of the file.
44
    fn length(&self) -> Result<u64, Error>;
45

            
46
    /// Synchronizes data and metadata to the final destination. This calls
47
    /// [`std::fs::File::sync_all()`] on files, which ensures all filesystem
48
    /// metadata (such as newly allocated blocks) and data is synchronized to
49
    /// the destination device.
50
    fn synchronize(&mut self) -> Result<(), Error>;
51

            
52
    /// Safely closes the file after flushing any pending operations to disk.
53
    fn close(self) -> Result<(), Error>;
54
}
55

            
56
/// A type that can open managed files.
57
pub trait ManagedFileOpener<File>
58
where
59
    File: ManagedFile,
60
{
61
    /// Opens a file at `path` with read-only permission.
62
    fn open_for_read(&self, path: impl IntoPathId + Send) -> Result<File, Error>;
63

            
64
    /// Opens or creates a file at `path`, positioning the cursor at the end of the file.
65
    fn open_for_append(&self, path: impl IntoPathId + Send) -> Result<File, Error>;
66
}
67

            
68
/// A manager that is responsible for controlling write access to a file.
69
pub trait FileManager:
70
    ManagedFileOpener<Self::File> + Default + Clone + Debug + Send + Sync + 'static
71
{
72
    /// The type of file managed by this manager.
73
    type File: ManagedFile<Manager = Self>;
74
    /// A file handle type, which can have operations executed against it.
75
    type FileHandle: OpenableFile<Self::File> + OperableFile<Self::File>;
76

            
77
    /// Returns the `PathId` for the given path. If the file manager does not
78
    /// know of this path and `create_if_not_found` is false, None will be
79
    /// returned. Otherwise, a value will always be returned.
80
    ///
81
    /// Until a path is deleted, the same `PathId` will be returned for the same
82
    /// `Path`.
83
    fn resolve_path(&self, path: impl AsRef<Path>, create_if_not_found: bool) -> Option<PathId>;
84

            
85
    /// Returns a file handle that can be used for reading operations.
86
    fn read(&self, path: impl IntoPathId) -> Result<Self::FileHandle, Error>;
87

            
88
    /// Returns a file handle that can be used to read and write.
89
    fn append(&self, path: impl IntoPathId) -> Result<Self::FileHandle, Error>;
90

            
91
    /// Returns the length of the file.
92
    fn file_length(&self, path: impl IntoPathId) -> Result<u64, Error>;
93

            
94
    /// Check if the file exists.
95
    fn exists(&self, path: impl IntoPathId) -> Result<bool, Error>;
96

            
97
    /// Closes all open handles for `path`, and calls `publish_callback` before
98
    /// unlocking any locks aquired during the operation.
99
    fn close_handles<F: FnOnce(PathId)>(&self, path: impl IntoPathId, publish_callback: F);
100

            
101
    /// Check if the file exists.
102
    fn delete(&self, path: impl IntoPathId) -> Result<bool, Error>;
103

            
104
    /// Removes a directory and all of its contents.
105
    fn delete_directory(&self, path: impl AsRef<Path>) -> Result<(), Error>;
106
}
107

            
108
/// A file that can have operations performed on it.
109
pub trait OpenableFile<F: ManagedFile>: Debug + Sized + Send + Sync {
110
    /// Returns the id of the file assigned from the file manager.
111
    fn id(&self) -> &PathId;
112

            
113
    /// Replaces the current file with the file located at `path` atomically.
114
    fn replace_with<C: FnOnce(PathId)>(
115
        self,
116
        replacement: F,
117
        manager: &F::Manager,
118
        publish_callback: C,
119
    ) -> Result<Self, Error>;
120

            
121
    /// Closes the file. This may not actually close the underlying file,
122
    /// depending on what other tasks have access to the underlying file as
123
    /// well.
124
    fn close(self) -> Result<(), Error>;
125
}
126

            
127
/// A file that can have an operation performed against it.
128
pub trait OperableFile<File>
129
where
130
    File: ManagedFile,
131
{
132
    /// Executes an operation and returns the results.
133
    fn execute<Output, Op: FileOp<Output>>(&mut self, operator: Op) -> Output;
134
}
135

            
136
/// An operation to perform on a file.
137
pub trait FileOp<Output> {
138
    /// Executes the operation and returns the result.
139
    fn execute(self, file: &mut dyn File) -> Output;
140
}
141

            
142
/// A unique ID for a path.
143
148858
#[derive(Clone, Debug)]
144
pub struct PathId {
145
    id: Option<u64>,
146
    path: Arc<PathBuf>,
147
}
148

            
149
impl PathId {
150
    /// Returns the id of the path, if present.
151
    #[must_use]
152
12754773
    pub const fn id(&self) -> Option<u64> {
153
12754773
        self.id
154
12754773
    }
155

            
156
    /// Returns the original path of this ID.
157
    #[must_use]
158
26840
    pub fn path(&self) -> &Path {
159
26840
        &self.path
160
26840
    }
161
}
162

            
163
impl Hash for PathId {
164
70645
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
165
70645
        self.path.hash(state);
166
70645
    }
167
}
168

            
169
impl Eq for PathId {}
170

            
171
impl PartialEq for PathId {
172
68928
    fn eq(&self, other: &Self) -> bool {
173
68928
        self.id == other.id || self.path == other.path
174
68928
    }
175
}
176

            
177
impl PartialEq<Path> for PathId {
178
    fn eq(&self, other: &Path) -> bool {
179
        &**self.path == other
180
    }
181
}
182

            
183
impl Borrow<Path> for PathId {
184
    fn borrow(&self) -> &Path {
185
        &self.path
186
    }
187
}
188

            
189
/// Resolves path-like types into the parts used by [`PathId`].
190
pub trait IntoPathId {
191
    /// Returns `self` as a `PathId`.
192
    fn into_path_id(self) -> PathId;
193
    /// Returns this type as a path.
194
    fn path(&self) -> &Path;
195
    /// Returns the id of this path, or None if not available.
196
    fn id(&self) -> Option<u64>;
197
}
198

            
199
impl<'a> IntoPathId for &'a Path {
200
26396
    fn into_path_id(self) -> PathId {
201
26396
        PathId {
202
26396
            id: None,
203
26396
            path: Arc::new(self.to_path_buf()),
204
26396
        }
205
26396
    }
206

            
207
    fn id(&self) -> Option<u64> {
208
        None
209
    }
210

            
211
3022
    fn path(&self) -> &Path {
212
3022
        self
213
3022
    }
214
}
215

            
216
impl<'a> IntoPathId for &'a PathBuf {
217
3531
    fn into_path_id(self) -> PathId {
218
3531
        PathId {
219
3531
            id: None,
220
3531
            path: Arc::new(self.clone()),
221
3531
        }
222
3531
    }
223

            
224
    fn id(&self) -> Option<u64> {
225
        None
226
    }
227

            
228
8
    fn path(&self) -> &Path {
229
8
        self
230
8
    }
231
}
232

            
233
impl IntoPathId for PathBuf {
234
26
    fn into_path_id(self) -> PathId {
235
26
        PathId {
236
26
            id: None,
237
26
            path: Arc::new(self),
238
26
        }
239
26
    }
240

            
241
    fn id(&self) -> Option<u64> {
242
        None
243
    }
244

            
245
    fn path(&self) -> &Path {
246
        self
247
    }
248
}
249

            
250
impl IntoPathId for Arc<PathBuf> {
251
2703
    fn into_path_id(self) -> PathId {
252
2703
        PathId {
253
2703
            id: None,
254
2703
            path: self,
255
2703
        }
256
2703
    }
257

            
258
    fn id(&self) -> Option<u64> {
259
        None
260
    }
261

            
262
2703
    fn path(&self) -> &Path {
263
2703
        self
264
2703
    }
265
}
266

            
267
impl<'a> IntoPathId for &'a Arc<PathBuf> {
268
3506
    fn into_path_id(self) -> PathId {
269
3506
        PathId {
270
3506
            id: None,
271
3506
            path: self.clone(),
272
3506
        }
273
3506
    }
274

            
275
    fn id(&self) -> Option<u64> {
276
        None
277
    }
278

            
279
    fn path(&self) -> &Path {
280
        self
281
    }
282
}
283

            
284
impl IntoPathId for PathId {
285
74211
    fn into_path_id(self) -> PathId {
286
74211
        self
287
74211
    }
288

            
289
1606
    fn id(&self) -> Option<u64> {
290
1606
        self.id
291
1606
    }
292

            
293
803
    fn path(&self) -> &Path {
294
803
        &self.path
295
803
    }
296
}
297

            
298
impl<'a> IntoPathId for &'a PathId {
299
48083
    fn into_path_id(self) -> PathId {
300
48083
        self.clone()
301
48083
    }
302

            
303
    fn id(&self) -> Option<u64> {
304
        self.id
305
    }
306

            
307
10116
    fn path(&self) -> &Path {
308
10116
        &self.path
309
10116
    }
310
}
311

            
312
impl IntoPathId for String {
313
2565
    fn into_path_id(self) -> PathId {
314
2565
        PathId {
315
2565
            id: None,
316
2565
            path: Arc::new(PathBuf::from(self)),
317
2565
        }
318
2565
    }
319

            
320
    fn id(&self) -> Option<u64> {
321
        None
322
    }
323

            
324
    fn path(&self) -> &Path {
325
        Path::new(self)
326
    }
327
}
328

            
329
impl<'a> IntoPathId for &'a str {
330
1
    fn into_path_id(self) -> PathId {
331
1
        PathId {
332
1
            id: None,
333
1
            path: Arc::new(PathBuf::from(self)),
334
1
        }
335
1
    }
336

            
337
    fn id(&self) -> Option<u64> {
338
        None
339
    }
340

            
341
    fn path(&self) -> &Path {
342
        Path::new(self)
343
    }
344
}
345

            
346
/// Converts between paths and unique IDs.
347
167202
#[derive(Default, Clone, Debug)]
348
pub struct PathIds {
349
    file_id_counter: Arc<AtomicU64>,
350
    file_ids: Arc<RwLock<HashSet<PathId>>>,
351
}
352

            
353
impl PathIds {
354
66334
    fn file_id_for_path(&self, path: impl IntoPathId, insert_if_not_found: bool) -> Option<PathId> {
355
66334
        let file_ids = self.file_ids.upgradable_read();
356
66334
        let path = path.into_path_id();
357
66334
        if let Some(id) = file_ids.get(&path) {
358
65422
            Some(id.clone())
359
912
        } else if insert_if_not_found {
360
899
            let mut file_ids = RwLockUpgradableReadGuard::upgrade(file_ids);
361
899
            // Assume that in the optimal flow, multiple threads aren't asking
362
899
            // to open the same path for the first time.
363
899
            let new_id = PathId {
364
899
                path: path.path.clone(),
365
899
                id: Some(self.file_id_counter.fetch_add(1, Ordering::SeqCst)),
366
899
            };
367
899
            if file_ids.insert(new_id.clone()) {
368
899
                Some(new_id)
369
            } else {
370
                file_ids.get(&path).cloned()
371
            }
372
        } else {
373
13
            None
374
        }
375
66334
    }
376

            
377
1606
    fn remove_file_id_for_path(&self, path: impl IntoPathId) -> Option<PathId> {
378
1606
        let mut file_ids = self.file_ids.write();
379
1606
        if path.id().is_some() {
380
803
            file_ids.take(&path.into_path_id())
381
        } else {
382
803
            file_ids.take(path.path())
383
        }
384
1606
    }
385

            
386
2703
    fn recreate_file_id_for_path(&self, path: impl IntoPathId) -> Option<RecreatedFile<'_>> {
387
2703
        let existing = path.into_path_id();
388
2703
        let mut file_ids = self.file_ids.write();
389
2703
        let new_id = self.file_id_counter.fetch_add(1, Ordering::SeqCst);
390
2703
        let new_id = PathId {
391
2703
            path: existing.path,
392
2703
            id: Some(new_id),
393
2703
        };
394
2703
        file_ids
395
2703
            .replace(new_id.clone())
396
2703
            .map(|old_id| RecreatedFile {
397
2703
                previous_id: old_id,
398
2703
                new_id,
399
2703
                _guard: file_ids,
400
2703
            })
401
2703
    }
402

            
403
    fn remove_file_ids_for_path_prefix(&self, path: &Path) -> Vec<PathId> {
404
        let mut file_ids = self.file_ids.write();
405
        let mut ids_to_remove = Vec::new();
406
        let mut paths_to_remove = Vec::new();
407
        for id in &*file_ids {
408
            if id.path().starts_with(path) {
409
                paths_to_remove.push(id.clone());
410
                ids_to_remove.push(id.clone());
411
            }
412
        }
413

            
414
        for path in paths_to_remove {
415
            file_ids.remove(&path);
416
        }
417

            
418
        ids_to_remove
419
    }
420
}
421

            
422
/// A file that has had its contents replaced. While this value exists, all
423
/// other threads will be blocked from interacting with the [`PathIds`]
424
/// structure. Only hold onto this value for short periods of time.
425
#[derive(Debug)]
426
pub struct RecreatedFile<'a> {
427
    /// The file's previous id.
428
    pub previous_id: PathId,
429
    /// The file's new id.
430
    pub new_id: PathId,
431
    _guard: RwLockWriteGuard<'a, HashSet<PathId>>,
432
}