1
use std::{
2
    collections::{HashMap, VecDeque},
3
    fs::{File, OpenOptions},
4
    io::{Read, Seek, SeekFrom, Write},
5
    path::Path,
6
    sync::Arc,
7
};
8

            
9
use parking_lot::Mutex;
10

            
11
use super::{FileManager, FileOp, ManagedFile, OpenableFile};
12
use crate::{
13
    error::Error,
14
    io::{File as _, IntoPathId, ManagedFileOpener, OperableFile, PathId, PathIds},
15
};
16

            
17
/// An open file that uses [`std::fs`].
18
#[derive(Debug)]
19
pub struct StdFile {
20
    file: File,
21
    id: PathId,
22
}
23

            
24
impl ManagedFile for StdFile {
25
    type Manager = StdFileManager;
26
}
27

            
28
impl super::File for StdFile {
29
10264856
    fn id(&self) -> &PathId {
30
10264856
        &self.id
31
10264856
    }
32

            
33
317995
    fn length(&self) -> Result<u64, Error> {
34
317995
        let metadata = self.file.metadata()?;
35
317995
        Ok(metadata.len())
36
317995
    }
37

            
38
2703
    fn close(mut self) -> Result<(), Error> {
39
2703
        self.synchronize()
40
2703
    }
41

            
42
59884
    fn synchronize(&mut self) -> Result<(), crate::Error> {
43
59884
        self.file.sync_data().map_err(Error::from)
44
59884
    }
45
}
46

            
47
/// A [`ManagedFileOpener`] implementation that produces [`StdFile`]s.
48
pub struct StdFileOpener;
49

            
50
impl ManagedFileOpener<StdFile> for StdFileOpener {
51
14366
    fn open_for_read(&self, path: impl IntoPathId + Send) -> Result<StdFile, Error> {
52
14366
        let path = path.into_path_id();
53
14366
        Ok(StdFile {
54
14366
            file: File::open(path.path())?,
55
14366
            id: path,
56
        })
57
14366
    }
58

            
59
5462
    fn open_for_append(&self, path: impl IntoPathId + Send) -> Result<StdFile, Error> {
60
5462
        let path = path.into_path_id();
61
5462
        Ok(StdFile {
62
5462
            file: OpenOptions::new()
63
5462
                .write(true)
64
5462
                .append(true)
65
5462
                .read(true)
66
5462
                .create(true)
67
5462
                .open(path.path())?,
68
5462
            id: path,
69
        })
70
5462
    }
71
}
72

            
73
impl Seek for StdFile {
74
8780002
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self,)))]
75
    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
76
        self.file.seek(pos)
77
    }
78
}
79

            
80
impl Write for StdFile {
81
275650
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, buf)))]
82
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
83
        self.file.write(buf)
84
    }
85

            
86
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
87
    fn flush(&mut self) -> std::io::Result<()> {
88
        self.file.flush()
89
    }
90
}
91

            
92
impl Read for StdFile {
93
15862902
    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, buf)))]
94
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
95
        self.file.read(buf)
96
    }
97
}
98

            
99
/// The [`FileManager`] for [`StdFile`].
100
86479
#[derive(Debug, Default, Clone)]
101
pub struct StdFileManager {
102
    file_ids: PathIds,
103
    open_files: Arc<Mutex<HashMap<u64, FileSlot>>>,
104
    reader_files: Arc<Mutex<HashMap<u64, VecDeque<StdFile>>>>,
105
}
106

            
107
#[derive(Debug)]
108
enum FileSlot {
109
    Available(StdFile),
110
    Taken,
111
    Waiting(flume::Sender<StdFile>),
112
}
113

            
114
impl FileManager for StdFileManager {
115
    type File = StdFile;
116
    type FileHandle = OpenStdFile;
117

            
118
28
    fn resolve_path(&self, path: impl AsRef<Path>, create_if_not_found: bool) -> Option<PathId> {
119
28
        self.file_ids
120
28
            .file_id_for_path(path.as_ref(), create_if_not_found)
121
28
    }
122

            
123
10015
    fn read(&self, path: impl IntoPathId) -> Result<Self::FileHandle, Error> {
124
10015
        let path = path.into_path_id();
125
10015
        let file_id = self.file_ids.file_id_for_path(path, true).unwrap();
126
10015

            
127
10015
        let mut reader_files = self.reader_files.lock();
128
10015
        let files = reader_files.entry(file_id.id().unwrap()).or_default();
129

            
130
10015
        if let Some(file) = files.pop_front() {
131
5704
            return Ok(OpenStdFile {
132
5704
                file: Some(file),
133
5704
                manager: Some(self.clone()),
134
5704
                reader: true,
135
5704
            });
136
4311
        }
137

            
138
4311
        let file = StdFileOpener.open_for_read(file_id)?;
139
4311
        Ok(OpenStdFile {
140
4311
            file: Some(file),
141
4311
            manager: Some(self.clone()),
142
4311
            reader: true,
143
4311
        })
144
10015
    }
145

            
146
28108
    fn append(&self, path: impl IntoPathId) -> Result<Self::FileHandle, Error> {
147
28108
        let path = path.into_path_id();
148
28108
        let file_id = self.file_ids.file_id_for_path(path, true).unwrap();
149
28108
        let mut open_files = self.open_files.lock();
150
28108
        if let Some(open_file) = open_files.get_mut(&file_id.id().unwrap()) {
151
25349
            let mut file = FileSlot::Taken;
152
25349
            std::mem::swap(&mut file, open_file);
153
25349
            let file = match file {
154
22493
                FileSlot::Available(file) => file,
155
2856
                other => {
156
2856
                    let (file_sender, file_receiver) = flume::bounded(1);
157
2856
                    *open_file = FileSlot::Waiting(file_sender);
158
2856
                    drop(open_files);
159
2856

            
160
2856
                    match file_receiver.recv() {
161
2856
                        Ok(file) => {
162
                            // If we stole the slot from another waiter (shouldn't
163
                            // happen in real usage), we need to reinstall it.
164
2856
                            if let FileSlot::Waiting(other_sender) = other {
165
1215
                                let mut open_files = self.open_files.lock();
166
1215
                                if let Some(open_file) = open_files.get_mut(&file_id.id().unwrap())
167
1215
                                {
168
1215
                                    *open_file = FileSlot::Waiting(other_sender);
169
1215
                                }
170
1641
                            }
171
2856
                            file
172
                        }
173
                        Err(flume::RecvError::Disconnected) => {
174
                            // If we are disconnected, we should recurse to try
175
                            // to acquire the file again.
176
                            return self.append(file_id);
177
                        }
178
                    }
179
                }
180
            };
181
25349
            Ok(OpenStdFile {
182
25349
                file: Some(file),
183
25349
                reader: false,
184
25349
                manager: Some(self.clone()),
185
25349
            })
186
        } else {
187
2759
            let file = self.open_for_append(file_id.clone())?;
188
2759
            open_files.insert(file_id.id().unwrap(), FileSlot::Taken);
189
2759
            Ok(OpenStdFile {
190
2759
                file: Some(file),
191
2759
                reader: false,
192
2759
                manager: Some(self.clone()),
193
2759
            })
194
        }
195
28108
    }
196

            
197
13120
    fn file_length(&self, path: impl IntoPathId) -> Result<u64, Error> {
198
13120
        path.path()
199
13120
            .metadata()
200
13120
            .map_err(Error::from)
201
13120
            .map(|metadata| metadata.len())
202
13120
    }
203

            
204
    fn exists(&self, path: impl IntoPathId) -> Result<bool, crate::Error> {
205
14
        if let Some(path_id) = self.resolve_path(path.path(), false) {
206
            {
207
14
                let open_files = self.open_files.lock();
208
14
                if open_files.contains_key(&path_id.id().unwrap()) {
209
                    return Ok(true);
210
14
                }
211
14
            }
212
14
            {
213
14
                let reader_files = self.reader_files.lock();
214
14
                if reader_files.contains_key(&path_id.id().unwrap()) {
215
2
                    return Ok(true);
216
12
                }
217
            }
218
        }
219

            
220
        // Not already open, just ask the filesystem
221
12
        Ok(path.path().exists())
222
14
    }
223

            
224
    fn close_handles<F: FnOnce(PathId)>(&self, path: impl IntoPathId, publish_callback: F) {
225
2703
        if let Some(result) = self.file_ids.recreate_file_id_for_path(path) {
226
2703
            let mut open_files = self.open_files.lock();
227
2703
            let mut reader_files = self.reader_files.lock();
228
2703
            open_files.remove(&result.previous_id.id().unwrap());
229
2703
            reader_files.remove(&result.previous_id.id().unwrap());
230
2703
            publish_callback(result.new_id);
231
2703
        }
232
2703
    }
233

            
234
    fn delete(&self, path: impl IntoPathId) -> Result<bool, Error> {
235
        let in_path = path.into_path_id();
236
        let file_id = self.file_ids.remove_file_id_for_path(&in_path);
237
        if let Some(file_id) = file_id {
238
            let mut open_files = self.open_files.lock();
239
            let mut reader_files = self.reader_files.lock();
240
            open_files.remove(&file_id.id().unwrap());
241
            reader_files.remove(&file_id.id().unwrap());
242
        }
243

            
244
        if in_path.path().exists() {
245
            std::fs::remove_file(in_path.path())?;
246
            Ok(true)
247
        } else {
248
            Ok(false)
249
        }
250
    }
251

            
252
    fn delete_directory(&self, path: impl AsRef<Path>) -> Result<(), Error> {
253
        let path = path.as_ref();
254
        let removed_ids = self.file_ids.remove_file_ids_for_path_prefix(path);
255
        let mut open_files = self.open_files.lock();
256
        let mut reader_files = self.reader_files.lock();
257
        for path_id in removed_ids {
258
            open_files.remove(&path_id.id().unwrap());
259
            reader_files.remove(&path_id.id().unwrap());
260
        }
261

            
262
        if path.exists() {
263
            std::fs::remove_dir_all(path)?;
264
        }
265

            
266
        Ok(())
267
    }
268
}
269

            
270
impl ManagedFileOpener<StdFile> for StdFileManager {
271
10055
    fn open_for_read(&self, path: impl IntoPathId + Send) -> Result<StdFile, Error> {
272
10055
        StdFileOpener.open_for_read(path)
273
10055
    }
274

            
275
5462
    fn open_for_append(&self, path: impl IntoPathId + Send) -> Result<StdFile, Error> {
276
5462
        StdFileOpener.open_for_append(path)
277
5462
    }
278
}
279

            
280
/// An open [`StdFile`] that belongs to a [`StdFileManager`].
281
#[derive(Debug)]
282
pub struct OpenStdFile {
283
    file: Option<StdFile>,
284
    manager: Option<StdFileManager>,
285
    reader: bool,
286
}
287

            
288
impl OpenableFile<StdFile> for OpenStdFile {
289
29180
    fn id(&self) -> &PathId {
290
29180
        self.file.as_ref().unwrap().id()
291
29180
    }
292

            
293
2703
    fn replace_with<C: FnOnce(PathId)>(
294
2703
        self,
295
2703
        replacement: StdFile,
296
2703
        manager: &StdFileManager,
297
2703
        publish_callback: C,
298
2703
    ) -> Result<Self, Error> {
299
2703
        let current_path = self.file.as_ref().unwrap().id.path.clone();
300
2703
        self.close()?;
301
2703
        let path = replacement.id.clone();
302
2703
        replacement.close()?;
303

            
304
2703
        std::fs::rename(path.path(), current_path.path())?;
305
2703
        manager.close_handles(&current_path, publish_callback);
306
2703
        manager.append(current_path)
307
2703
    }
308

            
309
5703
    fn close(self) -> Result<(), Error> {
310
5703
        drop(self);
311
5703
        Ok(())
312
5703
    }
313
}
314

            
315
impl OperableFile<StdFile> for OpenStdFile {
316
152550
    fn execute<Output, Op: FileOp<Output>>(&mut self, operator: Op) -> Output {
317
152550
        operator.execute(self.file.as_mut().unwrap())
318
152550
    }
319
}
320

            
321
impl Drop for OpenStdFile {
322
38123
    fn drop(&mut self) {
323
38123
        if let Some(manager) = &self.manager {
324
38123
            let file = self.file.take().unwrap();
325
38123
            if let Some(file_id) = file.id.id() {
326
38123
                if self.reader {
327
10015
                    let mut reader_files = manager.reader_files.lock();
328
10015
                    if let Some(path_files) = reader_files.get_mut(&file_id) {
329
10015
                        path_files.push_front(file);
330
10015
                    }
331
                } else {
332
28108
                    let mut writer_files = manager.open_files.lock();
333
28108
                    if let Some(writer_file) = writer_files.get_mut(&file_id) {
334
28108
                        match writer_file {
335
                            FileSlot::Available(_) => unreachable!(),
336
23611
                            FileSlot::Taken => {
337
23611
                                *writer_file = FileSlot::Available(file);
338
23611
                            }
339
4497
                            FileSlot::Waiting(sender) => {
340
4497
                                if let Err(flume::SendError(file)) = sender.send(file) {
341
1641
                                    *writer_file = FileSlot::Available(file);
342
2856
                                }
343
                            }
344
                        }
345
                    }
346
                }
347
            }
348
        }
349
38123
    }
350
}