1
use std::{
2
    collections::HashMap,
3
    io::{self, SeekFrom},
4
    ops::Neg,
5
    path::{Path, PathBuf},
6
    sync::{Arc, Weak},
7
};
8

            
9
use once_cell::sync::Lazy;
10
use parking_lot::{Mutex, RwLock};
11

            
12
use super::{FileManager, FileOp, ManagedFile, OpenableFile};
13
use crate::{
14
    error::Error,
15
    io::{File, IntoPathId, ManagedFileOpener, OperableFile, PathId, PathIds},
16
    ErrorKind,
17
};
18

            
19
type FileBuffer = Arc<RwLock<Vec<u8>>>;
20

            
21
/// A fake "file" represented by an in-memory buffer. This should only be used
22
/// in testing, as this database format is not optimized for memory efficiency.
23
#[derive(Clone)]
24
pub struct MemoryFile {
25
    id: PathId,
26
    buffer: FileBuffer,
27
    position: usize,
28
}
29

            
30
impl std::fmt::Debug for MemoryFile {
31
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32
        let buffer = self.buffer.read();
33
        f.debug_struct("MemoryFile")
34
            .field("id", &self.id)
35
            .field("buffer", &buffer.len())
36
            .field("position", &self.position)
37
            .finish()
38
    }
39
}
40

            
41
type OpenBuffers = Arc<Mutex<HashMap<Arc<PathBuf>, Weak<RwLock<Vec<u8>>>>>>;
42
static OPEN_BUFFERS: Lazy<OpenBuffers> = Lazy::new(Arc::default);
43

            
44
#[allow(clippy::needless_pass_by_value)]
45
4250
fn lookup_buffer(path: &PathId, create_if_not_found: bool) -> Option<Arc<RwLock<Vec<u8>>>> {
46
4250
    let mut open_buffers = OPEN_BUFFERS.lock();
47
4250
    if let Some(existing_buffer) = open_buffers.get(&path.path).and_then(Weak::upgrade) {
48
845
        Some(existing_buffer)
49
3405
    } else if create_if_not_found {
50
3405
        let new_buffer = Arc::default();
51
3405
        open_buffers.insert(path.path.clone(), Arc::downgrade(&new_buffer));
52
3405
        Some(new_buffer)
53
    } else {
54
        None
55
    }
56
4250
}
57

            
58
impl ManagedFile for MemoryFile {
59
    type Manager = MemoryFileManager;
60
}
61

            
62
#[allow(clippy::cast_possible_truncation)]
63
impl super::File for MemoryFile {
64
2395085
    fn id(&self) -> &PathId {
65
2395085
        &self.id
66
2395085
    }
67

            
68
129245
    fn length(&self) -> Result<u64, Error> {
69
129245
        let file_buffer = self.buffer.read();
70
129245
        Ok(file_buffer.len() as u64)
71
129245
    }
72

            
73
39442
    fn synchronize(&mut self) -> Result<(), Error> {
74
39442
        Ok(())
75
39442
    }
76

            
77
    fn close(self) -> Result<(), Error> {
78
        Ok(())
79
    }
80
}
81

            
82
/// A [`ManagedFileOpener`] implementation that produces [`MemoryFile`]s.
83
pub struct MemoryFileOpener;
84

            
85
impl ManagedFileOpener<MemoryFile> for MemoryFileOpener {
86
42
    fn open_for_read(&self, path: impl IntoPathId + Send) -> Result<MemoryFile, Error> {
87
42
        let id = path.into_path_id();
88
42
        let buffer = lookup_buffer(&id, true).unwrap();
89
42
        Ok(MemoryFile {
90
42
            id,
91
42
            buffer,
92
42
            position: 0,
93
42
        })
94
42
    }
95

            
96
4208
    fn open_for_append(&self, path: impl IntoPathId + Send) -> Result<MemoryFile, Error> {
97
4208
        let id = path.into_path_id();
98
4208
        let buffer = lookup_buffer(&id, true).unwrap();
99
4208
        let position = {
100
4208
            let buffer = buffer.read();
101
4208
            buffer.len()
102
4208
        };
103
4208
        Ok(MemoryFile {
104
4208
            id,
105
4208
            buffer,
106
4208
            position,
107
4208
        })
108
4208
    }
109
}
110

            
111
impl std::io::Seek for MemoryFile {
112
1578679
    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
113
1578679
        match pos {
114
1081695
            SeekFrom::Start(position) => self.position = usize::try_from(position).unwrap(),
115
            SeekFrom::End(from_end) => {
116
                let buffer = self.buffer.read();
117
                self.position = if from_end.is_positive() {
118
                    buffer.len()
119
                } else {
120
                    buffer.len() - usize::try_from(from_end.neg()).unwrap()
121
                };
122
            }
123
496984
            SeekFrom::Current(relative) => {
124
496984
                self.position = if relative.is_positive() {
125
496984
                    self.position + usize::try_from(relative).unwrap()
126
                } else {
127
                    self.position - usize::try_from(relative.neg()).unwrap()
128
                }
129
            }
130
        }
131
1578679
        Ok(self.position as u64)
132
1578679
    }
133
}
134

            
135
impl std::io::Read for MemoryFile {
136
2384373
    fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
137
2384373
        let file_buffer = self.buffer.read();
138
2384373

            
139
2384373
        let read_end = self.position + buffer.len();
140
2384373
        if read_end > file_buffer.len() {
141
            return Err(io::Error::new(
142
                io::ErrorKind::UnexpectedEof,
143
                ErrorKind::message("read requested more bytes than available"),
144
            ));
145
2384373
        }
146
2384373

            
147
2384373
        buffer.copy_from_slice(&file_buffer[self.position..read_end]);
148
2384373
        self.position = read_end;
149
2384373

            
150
2384373
        Ok(buffer.len())
151
2384373
    }
152
}
153

            
154
impl std::io::Write for MemoryFile {
155
112095
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
156
112095
        let mut file_buffer = self.buffer.write();
157
112095

            
158
112095
        file_buffer.extend_from_slice(buf);
159
112095
        self.position += buf.len();
160
112095

            
161
112095
        Ok(buf.len())
162
112095
    }
163

            
164
    fn flush(&mut self) -> io::Result<()> {
165
        Ok(())
166
    }
167
}
168

            
169
/// The [`FileManager`] implementation for [`MemoryFile`]. Simulates a
170
/// persistent in-memory filesystem.
171
80723
#[derive(Debug, Default, Clone)]
172
pub struct MemoryFileManager {
173
    file_ids: PathIds,
174
    open_files: Arc<Mutex<HashMap<u64, FileBuffer>>>,
175
}
176

            
177
impl MemoryFileManager {
178
27375
    fn lookup_file(
179
27375
        &self,
180
27375
        path: impl IntoPathId,
181
27375
        create_if_needed: bool,
182
27375
    ) -> Result<Option<MemoryFile>, Error> {
183
27375
        let path = path.into_path_id();
184
27375
        let id = match self.file_ids.file_id_for_path(path, create_if_needed) {
185
27362
            Some(id) => id,
186
13
            None => return Ok(None),
187
        };
188
27362
        let mut open_files = self.open_files.lock();
189
27362
        if let Some(open_file) = open_files.get(&id.id().unwrap()) {
190
26517
            Ok(Some(MemoryFile {
191
26517
                id,
192
26517
                buffer: open_file.clone(),
193
26517
                position: 0,
194
26517
            }))
195
845
        } else if create_if_needed {
196
840
            let file = MemoryFileOpener.open_for_append(id.clone())?;
197
840
            open_files.insert(id.id().unwrap(), file.buffer.clone());
198
840
            Ok(Some(file))
199
        } else {
200
5
            Ok(None)
201
        }
202
27375
    }
203

            
204
    fn forget_file(&self, path: PathId) -> bool {
205
1606
        if let Some(id) = self.file_ids.remove_file_id_for_path(path) {
206
803
            let mut open_files = self.open_files.lock();
207
803
            open_files.remove(&id.id().unwrap()).is_some()
208
        } else {
209
803
            false
210
        }
211
1606
    }
212
}
213

            
214
impl FileManager for MemoryFileManager {
215
    type File = MemoryFile;
216
    type FileHandle = OpenMemoryFile;
217

            
218
5
    fn resolve_path(&self, path: impl AsRef<Path>, create_if_not_found: bool) -> Option<PathId> {
219
5
        self.file_ids
220
5
            .file_id_for_path(path.as_ref(), create_if_not_found)
221
5
    }
222

            
223
8104
    fn read(&self, path: impl IntoPathId) -> Result<Self::FileHandle, Error> {
224
8104
        self.append(path)
225
8104
    }
226

            
227
24281
    fn append(&self, path: impl IntoPathId) -> Result<Self::FileHandle, Error> {
228
24281
        self.lookup_file(path, true).map(|file| OpenMemoryFile {
229
24281
            file: file.unwrap(),
230
24281
            manager: self.clone(),
231
24281
        })
232
24281
    }
233

            
234
3089
    fn file_length(&self, path: impl IntoPathId) -> Result<u64, Error> {
235
3089
        let file = self.lookup_file(path, false)?.ok_or_else(|| {
236
13
            ErrorKind::Io(io::Error::new(
237
13
                io::ErrorKind::NotFound,
238
13
                ErrorKind::message("not found"),
239
13
            ))
240
3089
        })?;
241
3076
        file.length()
242
3089
    }
243

            
244
5
    fn exists(&self, path: impl IntoPathId) -> Result<bool, Error> {
245
5
        Ok(self.lookup_file(path, false)?.is_some())
246
5
    }
247

            
248
803
    fn close_handles<F: FnOnce(PathId)>(&self, path: impl IntoPathId, publish_callback: F) {
249
803
        let path = path.into_path_id();
250
803
        self.forget_file(path.clone());
251
803
        let new_id = self.file_ids.file_id_for_path(path.path(), true).unwrap();
252
803
        publish_callback(new_id);
253
803
    }
254

            
255
803
    fn delete(&self, path: impl IntoPathId) -> Result<bool, Error> {
256
803
        let path = path.into_path_id();
257
803
        {
258
803
            let mut open_buffers = OPEN_BUFFERS.lock();
259
803
            open_buffers.remove(&path.path);
260
803
        }
261
803
        Ok(self.forget_file(path))
262
803
    }
263

            
264
    fn delete_directory(&self, path: impl AsRef<Path>) -> Result<(), Error> {
265
        let path = path.as_ref();
266
        let removed_ids = self.file_ids.remove_file_ids_for_path_prefix(path);
267
        let mut open_files = self.open_files.lock();
268
        for id in removed_ids {
269
            open_files.remove(&id.id().unwrap());
270
        }
271

            
272
        Ok(())
273
    }
274
}
275

            
276
impl ManagedFileOpener<MemoryFile> for MemoryFileManager {
277
42
    fn open_for_read(&self, path: impl IntoPathId + Send) -> Result<MemoryFile, Error> {
278
42
        MemoryFileOpener.open_for_read(path)
279
42
    }
280

            
281
803
    fn open_for_append(&self, path: impl IntoPathId + Send) -> Result<MemoryFile, Error> {
282
803
        MemoryFileOpener.open_for_append(path)
283
803
    }
284
}
285

            
286
/// An open [`MemoryFile`] that is owned by a [`MemoryFileManager`].
287
#[derive(Debug)]
288
pub struct OpenMemoryFile {
289
    file: MemoryFile,
290
    manager: MemoryFileManager,
291
}
292

            
293
impl OpenableFile<MemoryFile> for OpenMemoryFile {
294
17250
    fn id(&self) -> &PathId {
295
17250
        &self.file.id
296
17250
    }
297

            
298
803
    fn replace_with<C: FnOnce(PathId)>(
299
803
        self,
300
803
        replacement: MemoryFile,
301
803
        manager: &MemoryFileManager,
302
803
        publish_callback: C,
303
803
    ) -> Result<Self, Error> {
304
803
        let weak_buffer = Arc::downgrade(&replacement.buffer);
305
803
        drop(self.manager.delete(replacement.id()));
306
803
        {
307
803
            let mut open_buffers = OPEN_BUFFERS.lock();
308
803
            open_buffers.insert(self.file.id.path.clone(), weak_buffer);
309
803
        }
310
803
        manager.close_handles(&self.file.id, publish_callback);
311

            
312
803
        let new_file = manager.append(&self.file.id.path)?;
313
        {
314
803
            assert!(Arc::ptr_eq(&new_file.file.buffer, &replacement.buffer));
315
        }
316
803
        Ok(new_file)
317
803
    }
318

            
319
3000
    fn close(self) -> Result<(), Error> {
320
3000
        drop(self);
321
3000
        Ok(())
322
3000
    }
323
}
324

            
325
impl OperableFile<MemoryFile> for OpenMemoryFile {
326
82144
    fn execute<Output, Op: FileOp<Output>>(&mut self, operator: Op) -> Output {
327
82144
        operator.execute(&mut self.file)
328
82144
    }
329
}