ab_direct_io_file/
lib.rs

1//! Cross-platform APIs for working with files using direct I/O.
2//!
3//! Depending on OS, this will use direct I/O, unbuffered, uncaches passthrough file reads/writes,
4//! bypassing as much of OS machinery as possible.
5//!
6//! NOTE: There are major alignment requirements described here:
7//! <https://learn.microsoft.com/en-us/windows/win32/fileio/file-buffering#alignment-and-file-access-requirements>
8//! <https://man7.org/linux/man-pages/man2/open.2.html>
9
10// TODO: Windows shims are incomplete under Miri: https://github.com/rust-lang/miri/issues/3482
11#[cfg(all(test, not(all(miri, windows))))]
12mod tests;
13
14use parking_lot::Mutex;
15use std::fs::{File, OpenOptions};
16use std::path::Path;
17use std::{io, mem};
18
19/// 4096 is as a relatively safe size due to sector size on SSDs commonly being 512 or 4096 bytes
20pub const DISK_PAGE_SIZE: usize = 4096;
21/// Restrict how much data to read from the disk in a single call to avoid very large memory usage
22const MAX_READ_SIZE: usize = 1024 * 1024;
23
24const _: () = {
25    assert!(MAX_READ_SIZE.is_multiple_of(AlignedPageSize::SIZE));
26};
27
28/// A wrapper data structure with 4096 bytes alignment, which is the most common alignment for
29/// direct I/O operations.
30#[derive(Debug, Copy, Clone)]
31#[repr(C, align(4096))]
32pub struct AlignedPageSize([u8; AlignedPageSize::SIZE]);
33
34const _: () = {
35    assert!(align_of::<AlignedPageSize>() == AlignedPageSize::SIZE);
36};
37
38impl Default for AlignedPageSize {
39    #[inline(always)]
40    fn default() -> Self {
41        Self([0; AlignedPageSize::SIZE])
42    }
43}
44
45impl AlignedPageSize {
46    /// 4096 is as a relatively safe size due to sector size on SSDs commonly being 512 or 4096
47    /// bytes
48    pub const SIZE: usize = 4096;
49
50    /// Convenient conversion from slice to underlying representation for efficiency purposes
51    #[inline(always)]
52    pub fn slice_to_repr(value: &[Self]) -> &[[u8; AlignedPageSize::SIZE]] {
53        // SAFETY: `RecordChunk` is `#[repr(C)]` and guaranteed to have the same memory layout
54        unsafe { mem::transmute(value) }
55    }
56
57    /// Convenient conversion from a slice of underlying representation for efficiency purposes.
58    ///
59    /// Returns `None` if not correctly aligned.
60    #[inline]
61    pub fn try_slice_from_repr(value: &[[u8; AlignedPageSize::SIZE]]) -> Option<&[Self]> {
62        // SAFETY: All bit patterns are valid
63        let (before, slice, after) = unsafe { value.align_to::<Self>() };
64
65        if before.is_empty() && after.is_empty() {
66            Some(slice)
67        } else {
68            None
69        }
70    }
71
72    /// Convenient conversion from mutable slice to underlying representation for efficiency
73    /// purposes
74    #[inline(always)]
75    pub fn slice_mut_to_repr(slice: &mut [Self]) -> &mut [[u8; AlignedPageSize::SIZE]] {
76        // SAFETY: `AlignedSectorSize` is `#[repr(C)]` and its alignment is larger than inner value
77        unsafe { mem::transmute(slice) }
78    }
79
80    /// Convenient conversion from a slice of underlying representation for efficiency purposes.
81    ///
82    /// Returns `None` if not correctly aligned.
83    #[inline]
84    pub fn try_slice_mut_from_repr(
85        value: &mut [[u8; AlignedPageSize::SIZE]],
86    ) -> Option<&mut [Self]> {
87        // SAFETY: All bit patterns are valid
88        let (before, slice, after) = unsafe { value.align_to_mut::<Self>() };
89
90        if before.is_empty() && after.is_empty() {
91            Some(slice)
92        } else {
93            None
94        }
95    }
96}
97
98/// Wrapper data structure for direct/unbuffered/uncached I/O.
99///
100/// Depending on OS, this will use direct I/O, unbuffered, uncaches passthrough file reads/writes,
101/// bypassing as much of OS machinery as possible.
102///
103/// NOTE: There are major alignment requirements described here:
104/// <https://learn.microsoft.com/en-us/windows/win32/fileio/file-buffering#alignment-and-file-access-requirements>
105/// <https://man7.org/linux/man-pages/man2/open.2.html>
106#[derive(Debug)]
107pub struct DirectIoFile {
108    file: File,
109    /// Scratch buffer of aligned memory for reads and writes
110    scratch_buffer: Mutex<Vec<AlignedPageSize>>,
111}
112
113impl DirectIoFile {
114    /// Open a file with basic open options at the specified path for direct/unbuffered I/O for
115    /// reads and writes.
116    ///
117    /// `options` allows configuring things like read/write/create/truncate, but custom options
118    /// will be overridden internally.
119    ///
120    /// This is especially important on Windows to prevent huge memory usage.
121    #[inline]
122    pub fn open<P>(
123        #[cfg(any(target_os = "linux", windows))] mut options: OpenOptions,
124        #[cfg(not(any(target_os = "linux", windows)))] options: OpenOptions,
125        path: P,
126    ) -> io::Result<Self>
127    where
128        P: AsRef<Path>,
129    {
130        // Direct I/O on Linux
131        #[cfg(target_os = "linux")]
132        // TODO: Unlock under Miri once supported: https://github.com/rust-lang/miri/issues/4462
133        if !cfg!(miri) {
134            use std::os::unix::fs::OpenOptionsExt;
135
136            options.custom_flags(libc::O_DIRECT);
137        }
138        // Unbuffered write-through on Windows
139        #[cfg(windows)]
140        // TODO: Unlock under Miri once supported: https://github.com/rust-lang/miri/issues/4462
141        if !cfg!(miri) {
142            use std::os::windows::fs::OpenOptionsExt;
143
144            options.custom_flags(
145                windows::Win32::Storage::FileSystem::FILE_FLAG_WRITE_THROUGH.0
146                    | windows::Win32::Storage::FileSystem::FILE_FLAG_NO_BUFFERING.0,
147            );
148        }
149        let file = options.open(path)?;
150
151        // Disable caching on macOS
152        #[cfg(target_os = "macos")]
153        // TODO: Unlock under Miri once supported: https://github.com/rust-lang/miri/issues/4462
154        if !cfg!(miri) {
155            use std::os::unix::io::AsRawFd;
156
157            // SAFETY: FFI call with correct file descriptor and arguments
158            if unsafe { libc::fcntl(file.as_raw_fd(), libc::F_NOCACHE, 1) } != 0 {
159                return Err(io::Error::last_os_error());
160            }
161        }
162
163        Ok(Self {
164            file,
165            // In many cases, we'll want to read this much at once, so pre-allocate it right away
166            scratch_buffer: Mutex::new(vec![
167                AlignedPageSize::default();
168                MAX_READ_SIZE / AlignedPageSize::SIZE
169            ]),
170        })
171    }
172
173    /// Get file size
174    #[inline]
175    pub fn len(&self) -> io::Result<u64> {
176        Ok(self.file.metadata()?.len())
177    }
178
179    /// Returns `Ok(true)` if the file is empty
180    #[inline]
181    pub fn is_empty(&self) -> io::Result<bool> {
182        Ok(self.len()? == 0)
183    }
184
185    /// Make sure the file has a specified number of bytes allocated on the disk.
186    ///
187    /// Later writes within `len` will not fail due to lack of disk space.
188    #[inline(always)]
189    pub fn allocate(&self, len: u64) -> io::Result<()> {
190        fs2::FileExt::allocate(&self.file, len)
191    }
192
193    /// Truncates or extends the underlying file, updating the size of this file to become `len`.
194    ///
195    /// Note if `len` is larger than the previous file size, it will result in a sparse file. If
196    /// you'd like to pre-allocate space on disk, use [`Self::allocate()`], which may be followed by
197    /// this method to truncate the file if the new file size is smaller than the previous
198    /// ([`Self::allocate()`] doesn't truncate the file).
199    #[inline(always)]
200    pub fn set_len(&self, len: u64) -> io::Result<()> {
201        self.file.set_len(len)
202    }
203
204    /// Read the exact number of bytes needed to fill `buf` at `offset`.
205    ///
206    /// NOTE: This uses locking and buffering internally, prefer [`Self::write_all_at_raw()`] if you
207    /// can control data alignment.
208    pub fn read_exact_at(&self, buf: &mut [u8], mut offset: u64) -> io::Result<()> {
209        if buf.is_empty() {
210            return Ok(());
211        }
212
213        let mut scratch_buffer = self.scratch_buffer.lock();
214
215        // This is guaranteed by the constructor
216        debug_assert!(
217            AlignedPageSize::slice_to_repr(&scratch_buffer)
218                .as_flattened()
219                .len()
220                <= MAX_READ_SIZE
221        );
222
223        // First read up to `MAX_READ_SIZE - padding`
224        let padding = (offset % AlignedPageSize::SIZE as u64) as usize;
225        let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len());
226        let (unaligned_start, buf) = buf.split_at_mut(first_unaligned_chunk_size);
227        {
228            let bytes_to_read = unaligned_start.len();
229            unaligned_start.copy_from_slice(self.read_exact_at_internal(
230                &mut scratch_buffer,
231                bytes_to_read,
232                offset,
233            )?);
234            offset += unaligned_start.len() as u64;
235        }
236
237        if buf.is_empty() {
238            return Ok(());
239        }
240
241        // Process the rest of the chunks, up to `MAX_READ_SIZE` at a time
242        for buf in buf.chunks_mut(MAX_READ_SIZE) {
243            let bytes_to_read = buf.len();
244            buf.copy_from_slice(self.read_exact_at_internal(
245                &mut scratch_buffer,
246                bytes_to_read,
247                offset,
248            )?);
249            offset += buf.len() as u64;
250        }
251
252        Ok(())
253    }
254
255    /// Write all bytes at `buf` at `offset`.
256    ///
257    /// NOTE: This uses locking and buffering internally, prefer [`Self::write_all_at_raw()`] if you
258    /// can control data alignment.
259    pub fn write_all_at(&self, buf: &[u8], mut offset: u64) -> io::Result<()> {
260        if buf.is_empty() {
261            return Ok(());
262        }
263
264        let mut scratch_buffer = self.scratch_buffer.lock();
265
266        // This is guaranteed by the constructor
267        debug_assert!(
268            AlignedPageSize::slice_to_repr(&scratch_buffer)
269                .as_flattened()
270                .len()
271                <= MAX_READ_SIZE
272        );
273
274        // First, write up to `MAX_READ_SIZE - padding`
275        let padding = (offset % AlignedPageSize::SIZE as u64) as usize;
276        let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len());
277        let (unaligned_start, buf) = buf.split_at(first_unaligned_chunk_size);
278        {
279            self.write_all_at_internal(&mut scratch_buffer, unaligned_start, offset)?;
280            offset += unaligned_start.len() as u64;
281        }
282
283        if buf.is_empty() {
284            return Ok(());
285        }
286
287        // Process the rest of the chunks, up to `MAX_READ_SIZE` at a time
288        for buf in buf.chunks(MAX_READ_SIZE) {
289            self.write_all_at_internal(&mut scratch_buffer, buf, offset)?;
290            offset += buf.len() as u64;
291        }
292
293        Ok(())
294    }
295
296    /// Low-level reading into aligned memory.
297    ///
298    /// `offset` needs to be page-aligned as well or use [`Self::read_exact_at()`] if you're willing
299    /// to pay for the corresponding overhead.
300    #[inline]
301    pub fn read_exact_at_raw(&self, buf: &mut [AlignedPageSize], offset: u64) -> io::Result<()> {
302        let buf = AlignedPageSize::slice_mut_to_repr(buf).as_flattened_mut();
303
304        #[cfg(unix)]
305        {
306            use std::os::unix::fs::FileExt;
307
308            self.file.read_exact_at(buf, offset)
309        }
310        #[cfg(windows)]
311        {
312            use std::os::windows::fs::FileExt;
313
314            let mut buf = buf;
315            let mut offset = offset;
316            while !buf.is_empty() {
317                match self.file.seek_read(buf, offset) {
318                    Ok(0) => {
319                        break;
320                    }
321                    Ok(n) => {
322                        buf = &mut buf[n..];
323                        offset += n as u64;
324                    }
325                    Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
326                        // Try again
327                    }
328                    Err(e) => {
329                        return Err(e);
330                    }
331                }
332            }
333
334            if !buf.is_empty() {
335                Err(io::Error::new(
336                    io::ErrorKind::UnexpectedEof,
337                    "failed to fill the whole buffer",
338                ))
339            } else {
340                Ok(())
341            }
342        }
343    }
344
345    /// Low-level writing from aligned memory.
346    ///
347    /// `offset` needs to be page-aligned as well or use [`Self::write_all_at()`] if you're willing
348    /// to pay for the corresponding overhead.
349    #[inline]
350    pub fn write_all_at_raw(&self, buf: &[AlignedPageSize], offset: u64) -> io::Result<()> {
351        let buf = AlignedPageSize::slice_to_repr(buf).as_flattened();
352
353        #[cfg(unix)]
354        {
355            use std::os::unix::fs::FileExt;
356
357            self.file.write_all_at(buf, offset)
358        }
359        #[cfg(windows)]
360        {
361            use std::os::windows::fs::FileExt;
362
363            let mut buf = buf;
364            let mut offset = offset;
365            while !buf.is_empty() {
366                match self.file.seek_write(buf, offset) {
367                    Ok(0) => {
368                        return Err(io::Error::new(
369                            io::ErrorKind::WriteZero,
370                            "failed to write whole buffer",
371                        ));
372                    }
373                    Ok(n) => {
374                        buf = &buf[n..];
375                        offset += n as u64;
376                    }
377                    Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
378                        // Try again
379                    }
380                    Err(e) => {
381                        return Err(e);
382                    }
383                }
384            }
385
386            Ok(())
387        }
388    }
389
390    /// Access internal [`File`] instance
391    #[inline(always)]
392    pub fn file(&self) -> &File {
393        &self.file
394    }
395
396    fn read_exact_at_internal<'a>(
397        &self,
398        scratch_buffer: &'a mut [AlignedPageSize],
399        bytes_to_read: usize,
400        offset: u64,
401    ) -> io::Result<&'a [u8]> {
402        let page_aligned_offset =
403            offset / AlignedPageSize::SIZE as u64 * AlignedPageSize::SIZE as u64;
404        let padding = (offset - page_aligned_offset) as usize;
405
406        // Make a scratch buffer of a size that is necessary to read aligned memory, accounting
407        // for extra bytes at the beginning and the end that will be thrown away
408        let pages_to_read = (padding + bytes_to_read).div_ceil(AlignedPageSize::SIZE);
409        let scratch_buffer = &mut scratch_buffer[..pages_to_read];
410
411        self.read_exact_at_raw(scratch_buffer, page_aligned_offset)?;
412
413        Ok(
414            &AlignedPageSize::slice_to_repr(scratch_buffer).as_flattened()[padding..]
415                [..bytes_to_read],
416        )
417    }
418
419    /// Panics on writes over `MAX_READ_SIZE` (including padding on both ends)
420    fn write_all_at_internal(
421        &self,
422        scratch_buffer: &mut [AlignedPageSize],
423        bytes_to_write: &[u8],
424        offset: u64,
425    ) -> io::Result<()> {
426        let page_aligned_offset =
427            offset / AlignedPageSize::SIZE as u64 * AlignedPageSize::SIZE as u64;
428        let padding = (offset - page_aligned_offset) as usize;
429
430        // Calculate the size of the read including padding on both ends
431        let pages_to_read = (padding + bytes_to_write.len()).div_ceil(AlignedPageSize::SIZE);
432
433        if padding == 0 && pages_to_read == bytes_to_write.len() {
434            let scratch_buffer = &mut scratch_buffer[..pages_to_read];
435            AlignedPageSize::slice_mut_to_repr(scratch_buffer)
436                .as_flattened_mut()
437                .copy_from_slice(bytes_to_write);
438            self.write_all_at_raw(scratch_buffer, offset)?;
439        } else {
440            let scratch_buffer = &mut scratch_buffer[..pages_to_read];
441            // Read whole pages where `bytes_to_write` will be written
442            self.read_exact_at_raw(scratch_buffer, page_aligned_offset)?;
443            // Update the contents of existing pages and write into the file
444            AlignedPageSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut()[padding..]
445                [..bytes_to_write.len()]
446                .copy_from_slice(bytes_to_write);
447            self.write_all_at_raw(scratch_buffer, page_aligned_offset)?;
448        }
449
450        Ok(())
451    }
452}