Skip to main content

ab_direct_io_file/
lib.rs

1//! Cross-platform APIs for working with files using direct I/O.
2//!
3//! Depending on OS, this will use direct I/O, unbuffered, uncaches passthrough file reads/writes,
4//! bypassing as much of OS machinery as possible.
5//!
6//! NOTE: There are major alignment requirements described here:
7//! <https://learn.microsoft.com/en-us/windows/win32/fileio/file-buffering#alignment-and-file-access-requirements>
8//! <https://man7.org/linux/man-pages/man2/open.2.html>
9
10#![feature(const_block_items)]
11
12// TODO: Windows shims are incomplete under Miri: https://github.com/rust-lang/miri/issues/3482
13#[cfg(all(test, not(all(miri, windows))))]
14mod tests;
15
16use parking_lot::Mutex;
17use std::fs::{File, OpenOptions};
18use std::mem::MaybeUninit;
19use std::path::Path;
20use std::{io, mem, slice};
21
22/// 4096 is as a relatively safe size due to sector size on SSDs commonly being 512 or 4096 bytes
23pub const DISK_PAGE_SIZE: usize = 4096;
24/// Restrict how much data to read from the disk in a single call to avoid very large memory usage
25const MAX_READ_SIZE: usize = 1024 * 1024;
26
27const {
28    assert!(MAX_READ_SIZE.is_multiple_of(AlignedPage::SIZE));
29}
30
31/// A wrapper data structure with 4096 bytes alignment, which is the most common alignment for
32/// direct I/O operations.
33#[derive(Debug, Copy, Clone)]
34#[repr(C, align(4096))]
35pub struct AlignedPage([u8; AlignedPage::SIZE]);
36
37const {
38    assert!(align_of::<AlignedPage>() == AlignedPage::SIZE);
39}
40
41impl Default for AlignedPage {
42    #[inline(always)]
43    fn default() -> Self {
44        Self([0; AlignedPage::SIZE])
45    }
46}
47
48impl AlignedPage {
49    /// 4096 is as a relatively safe size due to sector size on SSDs commonly being 512 or 4096
50    /// bytes
51    pub const SIZE: usize = 4096;
52
53    /// Convert an exclusive slice to an uninitialized version
54    pub fn as_uninit_slice_mut(value: &mut [Self]) -> &mut [MaybeUninit<Self>] {
55        // SAFETY: Same layout
56        unsafe { mem::transmute(value) }
57    }
58
59    /// Convenient conversion from slice to underlying representation for efficiency purposes
60    #[inline(always)]
61    pub fn slice_to_repr(value: &[Self]) -> &[[u8; AlignedPage::SIZE]] {
62        // SAFETY: `AlignedPage` is `#[repr(C)]` and guaranteed to have the same memory layout
63        unsafe { mem::transmute(value) }
64    }
65
66    /// Convenient conversion from slice to underlying representation for efficiency purposes
67    #[inline(always)]
68    pub fn uninit_slice_to_repr(
69        value: &[MaybeUninit<Self>],
70    ) -> &[MaybeUninit<[u8; AlignedPage::SIZE]>] {
71        // SAFETY: `AlignedPage` is `#[repr(C)]` and guaranteed to have the same memory layout
72        unsafe { mem::transmute(value) }
73    }
74
75    /// Convenient conversion from a slice of underlying representation for efficiency purposes.
76    ///
77    /// Returns `None` if not correctly aligned.
78    #[inline]
79    pub fn try_slice_from_repr(value: &[[u8; AlignedPage::SIZE]]) -> Option<&[Self]> {
80        // SAFETY: All bit patterns are valid
81        let (before, slice, after) = unsafe { value.align_to::<Self>() };
82
83        if before.is_empty() && after.is_empty() {
84            Some(slice)
85        } else {
86            None
87        }
88    }
89
90    /// Convenient conversion from a slice of underlying representation for efficiency purposes.
91    ///
92    /// Returns `None` if not correctly aligned.
93    #[inline]
94    pub fn try_uninit_slice_from_repr(
95        value: &[MaybeUninit<[u8; AlignedPage::SIZE]>],
96    ) -> Option<&[MaybeUninit<Self>]> {
97        // SAFETY: All bit patterns are valid
98        let (before, slice, after) = unsafe { value.align_to::<MaybeUninit<Self>>() };
99
100        if before.is_empty() && after.is_empty() {
101            Some(slice)
102        } else {
103            None
104        }
105    }
106
107    /// Convenient conversion from mutable slice to underlying representation for efficiency
108    /// purposes
109    #[inline(always)]
110    pub fn slice_mut_to_repr(slice: &mut [Self]) -> &mut [[u8; AlignedPage::SIZE]] {
111        // SAFETY: `AlignedSectorSize` is `#[repr(C)]` and its alignment is larger than inner value
112        unsafe { mem::transmute(slice) }
113    }
114
115    /// Convenient conversion from mutable slice to underlying representation for efficiency
116    /// purposes
117    #[inline(always)]
118    pub fn uninit_slice_mut_to_repr(
119        slice: &mut [MaybeUninit<Self>],
120    ) -> &mut [MaybeUninit<[u8; AlignedPage::SIZE]>] {
121        // SAFETY: `AlignedSectorSize` is `#[repr(C)]` and its alignment is larger than inner value
122        unsafe { mem::transmute(slice) }
123    }
124
125    /// Convenient conversion from a slice of underlying representation for efficiency purposes.
126    ///
127    /// Returns `None` if not correctly aligned.
128    #[inline]
129    pub fn try_slice_mut_from_repr(value: &mut [[u8; AlignedPage::SIZE]]) -> Option<&mut [Self]> {
130        // SAFETY: All bit patterns are valid
131        let (before, slice, after) = unsafe { value.align_to_mut::<Self>() };
132
133        if before.is_empty() && after.is_empty() {
134            Some(slice)
135        } else {
136            None
137        }
138    }
139
140    /// Convenient conversion from a slice of underlying representation for efficiency purposes.
141    ///
142    /// Returns `None` if not correctly aligned.
143    #[inline]
144    pub fn try_uninit_slice_mut_from_repr(
145        value: &mut [MaybeUninit<[u8; AlignedPage::SIZE]>],
146    ) -> Option<&mut [MaybeUninit<Self>]> {
147        // SAFETY: All bit patterns are valid
148        let (before, slice, after) = unsafe { value.align_to_mut::<MaybeUninit<Self>>() };
149
150        if before.is_empty() && after.is_empty() {
151            Some(slice)
152        } else {
153            None
154        }
155    }
156}
157
158/// Wrapper data structure for direct/unbuffered/uncached I/O.
159///
160/// Depending on OS, this will use direct I/O, unbuffered, uncaches passthrough file reads/writes,
161/// bypassing as much of OS machinery as possible.
162///
163/// NOTE: There are major alignment requirements described here:
164/// <https://learn.microsoft.com/en-us/windows/win32/fileio/file-buffering#alignment-and-file-access-requirements>
165/// <https://man7.org/linux/man-pages/man2/open.2.html>
166#[derive(Debug)]
167pub struct DirectIoFile {
168    file: File,
169    /// Scratch buffer of aligned memory for reads and writes
170    scratch_buffer: Mutex<Vec<AlignedPage>>,
171}
172
173impl DirectIoFile {
174    /// Open a file with basic open options at the specified path for direct/unbuffered I/O for
175    /// reads and writes.
176    ///
177    /// `options` allows configuring things like read/write/create/truncate, but custom options
178    /// will be overridden internally.
179    ///
180    /// This is especially important on Windows to prevent huge memory usage.
181    #[inline]
182    pub fn open<P>(
183        #[cfg(any(target_os = "linux", windows))] mut options: OpenOptions,
184        #[cfg(not(any(target_os = "linux", windows)))] options: OpenOptions,
185        path: P,
186    ) -> io::Result<Self>
187    where
188        P: AsRef<Path>,
189    {
190        // Direct I/O on Linux
191        #[cfg(target_os = "linux")]
192        // TODO: Unlock under Miri once supported: https://github.com/rust-lang/miri/issues/4462
193        if !cfg!(miri) {
194            use std::os::unix::fs::OpenOptionsExt;
195
196            options.custom_flags(libc::O_DIRECT);
197        }
198        // Unbuffered write-through on Windows
199        #[cfg(windows)]
200        // TODO: Unlock under Miri once supported: https://github.com/rust-lang/miri/issues/4462
201        if !cfg!(miri) {
202            use std::os::windows::fs::OpenOptionsExt;
203
204            options.custom_flags(
205                windows::Win32::Storage::FileSystem::FILE_FLAG_WRITE_THROUGH.0
206                    | windows::Win32::Storage::FileSystem::FILE_FLAG_NO_BUFFERING.0,
207            );
208        }
209        let file = options.open(path)?;
210
211        // Disable caching on macOS
212        #[cfg(target_os = "macos")]
213        // TODO: Unlock under Miri once supported: https://github.com/rust-lang/miri/issues/4462
214        if !cfg!(miri) {
215            use std::os::unix::io::AsRawFd;
216
217            // SAFETY: FFI call with correct file descriptor and arguments
218            if unsafe { libc::fcntl(file.as_raw_fd(), libc::F_NOCACHE, 1) } != 0 {
219                return Err(io::Error::last_os_error());
220            }
221        }
222
223        Ok(Self {
224            file,
225            // In many cases, we'll want to read this much at once, so pre-allocate it right away
226            scratch_buffer: Mutex::new(vec![
227                AlignedPage::default();
228                MAX_READ_SIZE / AlignedPage::SIZE
229            ]),
230        })
231    }
232
233    /// Get file size
234    #[inline]
235    pub fn len(&self) -> io::Result<u64> {
236        Ok(self.file.metadata()?.len())
237    }
238
239    /// Returns `Ok(true)` if the file is empty
240    #[inline]
241    pub fn is_empty(&self) -> io::Result<bool> {
242        Ok(self.len()? == 0)
243    }
244
245    /// Make sure the file has a specified number of bytes allocated on the disk.
246    ///
247    /// Later writes within `len` will not fail due to lack of disk space.
248    #[inline(always)]
249    pub fn allocate(&self, len: u64) -> io::Result<()> {
250        fs2::FileExt::allocate(&self.file, len)
251    }
252
253    /// Truncates or extends the underlying file, updating the size of this file to become `len`.
254    ///
255    /// Note if `len` is larger than the previous file size, it will result in a sparse file. If
256    /// you'd like to pre-allocate space on disk, use [`Self::allocate()`], which may be followed by
257    /// this method to truncate the file if the new file size is smaller than the previous
258    /// ([`Self::allocate()`] doesn't truncate the file).
259    #[inline(always)]
260    pub fn set_len(&self, len: u64) -> io::Result<()> {
261        self.file.set_len(len)
262    }
263
264    /// Read the exact number of bytes needed to fill `buf` at `offset`.
265    ///
266    /// NOTE: This uses locking and buffering internally, prefer [`Self::write_all_at_raw()`] if you
267    /// can control data alignment.
268    pub fn read_exact_at(&self, buf: &mut [u8], mut offset: u64) -> io::Result<()> {
269        if buf.is_empty() {
270            return Ok(());
271        }
272
273        let mut scratch_buffer = self.scratch_buffer.lock();
274
275        // This is guaranteed by the constructor
276        debug_assert!(
277            AlignedPage::slice_to_repr(&scratch_buffer)
278                .as_flattened()
279                .len()
280                <= MAX_READ_SIZE
281        );
282
283        // First read up to `MAX_READ_SIZE - padding`
284        let padding = (offset % AlignedPage::SIZE as u64) as usize;
285        let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len());
286        let (unaligned_start, buf) = buf.split_at_mut(first_unaligned_chunk_size);
287        {
288            let bytes_to_read = unaligned_start.len();
289            unaligned_start.copy_from_slice(self.read_exact_at_internal(
290                &mut scratch_buffer,
291                bytes_to_read,
292                offset,
293            )?);
294            offset += unaligned_start.len() as u64;
295        }
296
297        if buf.is_empty() {
298            return Ok(());
299        }
300
301        // Process the rest of the chunks, up to `MAX_READ_SIZE` at a time
302        for buf in buf.chunks_mut(MAX_READ_SIZE) {
303            let bytes_to_read = buf.len();
304            buf.copy_from_slice(self.read_exact_at_internal(
305                &mut scratch_buffer,
306                bytes_to_read,
307                offset,
308            )?);
309            offset += buf.len() as u64;
310        }
311
312        Ok(())
313    }
314
315    /// Write all bytes at `buf` at `offset`.
316    ///
317    /// NOTE: This uses locking and buffering internally, prefer [`Self::write_all_at_raw()`] if you
318    /// can control data alignment.
319    pub fn write_all_at(&self, buf: &[u8], mut offset: u64) -> io::Result<()> {
320        if buf.is_empty() {
321            return Ok(());
322        }
323
324        let mut scratch_buffer = self.scratch_buffer.lock();
325
326        // This is guaranteed by the constructor
327        debug_assert!(
328            AlignedPage::slice_to_repr(&scratch_buffer)
329                .as_flattened()
330                .len()
331                <= MAX_READ_SIZE
332        );
333
334        // First, write up to `MAX_READ_SIZE - padding`
335        let padding = (offset % AlignedPage::SIZE as u64) as usize;
336        let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len());
337        let (unaligned_start, buf) = buf.split_at(first_unaligned_chunk_size);
338        {
339            self.write_all_at_internal(&mut scratch_buffer, unaligned_start, offset)?;
340            offset += unaligned_start.len() as u64;
341        }
342
343        if buf.is_empty() {
344            return Ok(());
345        }
346
347        // Process the rest of the chunks, up to `MAX_READ_SIZE` at a time
348        for buf in buf.chunks(MAX_READ_SIZE) {
349            self.write_all_at_internal(&mut scratch_buffer, buf, offset)?;
350            offset += buf.len() as u64;
351        }
352
353        Ok(())
354    }
355
356    /// Low-level reading into aligned memory.
357    ///
358    /// `offset` needs to be page-aligned as well or use [`Self::read_exact_at()`] if you're willing
359    /// to pay for the corresponding overhead.
360    ///
361    /// Successful result guarantees that all bytes in `buf` were written.
362    #[inline]
363    pub fn read_exact_at_raw(
364        &self,
365        buf: &mut [MaybeUninit<AlignedPage>],
366        offset: u64,
367    ) -> io::Result<()> {
368        let buf = AlignedPage::uninit_slice_mut_to_repr(buf);
369
370        // TODO: Switch to APIs from https://github.com/rust-lang/rust/issues/140771 once
371        //  implementation lands in nightly
372        // SAFETY: `buf` is never read by Rust internal API, only written to
373        let buf = unsafe {
374            slice::from_raw_parts_mut(
375                buf.as_mut_ptr().cast::<[u8; AlignedPage::SIZE]>(),
376                buf.len(),
377            )
378        };
379
380        let buf = buf.as_flattened_mut();
381
382        #[cfg(unix)]
383        {
384            use std::os::unix::fs::FileExt;
385
386            self.file.read_exact_at(buf, offset)
387        }
388        #[cfg(windows)]
389        {
390            use std::os::windows::fs::FileExt;
391
392            let mut buf = buf;
393            let mut offset = offset;
394            while !buf.is_empty() {
395                match self.file.seek_read(buf, offset) {
396                    Ok(0) => {
397                        break;
398                    }
399                    Ok(n) => {
400                        buf = &mut buf[n..];
401                        offset += n as u64;
402                    }
403                    Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
404                        // Try again
405                    }
406                    Err(e) => {
407                        return Err(e);
408                    }
409                }
410            }
411
412            if !buf.is_empty() {
413                Err(io::Error::new(
414                    io::ErrorKind::UnexpectedEof,
415                    "failed to fill the whole buffer",
416                ))
417            } else {
418                Ok(())
419            }
420        }
421    }
422
423    /// Low-level writing from aligned memory.
424    ///
425    /// `offset` needs to be page-aligned as well or use [`Self::write_all_at()`] if you're willing
426    /// to pay for the corresponding overhead.
427    #[inline]
428    pub fn write_all_at_raw(&self, buf: &[AlignedPage], offset: u64) -> io::Result<()> {
429        let buf = AlignedPage::slice_to_repr(buf).as_flattened();
430
431        #[cfg(unix)]
432        {
433            use std::os::unix::fs::FileExt;
434
435            self.file.write_all_at(buf, offset)
436        }
437        #[cfg(windows)]
438        {
439            use std::os::windows::fs::FileExt;
440
441            let mut buf = buf;
442            let mut offset = offset;
443            while !buf.is_empty() {
444                match self.file.seek_write(buf, offset) {
445                    Ok(0) => {
446                        return Err(io::Error::new(
447                            io::ErrorKind::WriteZero,
448                            "failed to write the whole buffer",
449                        ));
450                    }
451                    Ok(n) => {
452                        buf = &buf[n..];
453                        offset += n as u64;
454                    }
455                    Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
456                        // Try again
457                    }
458                    Err(e) => {
459                        return Err(e);
460                    }
461                }
462            }
463
464            Ok(())
465        }
466    }
467
468    /// Access internal [`File`] instance
469    #[inline(always)]
470    pub fn file(&self) -> &File {
471        &self.file
472    }
473
474    fn read_exact_at_internal<'a>(
475        &self,
476        scratch_buffer: &'a mut [AlignedPage],
477        bytes_to_read: usize,
478        offset: u64,
479    ) -> io::Result<&'a [u8]> {
480        let page_aligned_offset = offset / AlignedPage::SIZE as u64 * AlignedPage::SIZE as u64;
481        let padding = (offset - page_aligned_offset) as usize;
482
483        // Make a scratch buffer of a size that is necessary to read aligned memory, accounting
484        // for extra bytes at the beginning and the end that will be thrown away
485        let pages_to_read = (padding + bytes_to_read).div_ceil(AlignedPage::SIZE);
486        let scratch_buffer = &mut scratch_buffer[..pages_to_read];
487
488        self.read_exact_at_raw(
489            AlignedPage::as_uninit_slice_mut(scratch_buffer),
490            page_aligned_offset,
491        )?;
492
493        Ok(&AlignedPage::slice_to_repr(scratch_buffer).as_flattened()[padding..][..bytes_to_read])
494    }
495
496    /// Panics on writes over `MAX_READ_SIZE` (including padding on both ends)
497    fn write_all_at_internal(
498        &self,
499        scratch_buffer: &mut [AlignedPage],
500        bytes_to_write: &[u8],
501        offset: u64,
502    ) -> io::Result<()> {
503        let page_aligned_offset = offset / AlignedPage::SIZE as u64 * AlignedPage::SIZE as u64;
504        let padding = (offset - page_aligned_offset) as usize;
505
506        // Calculate the size of the read including padding on both ends
507        let pages_to_read = (padding + bytes_to_write.len()).div_ceil(AlignedPage::SIZE);
508
509        if padding == 0 && pages_to_read == bytes_to_write.len() {
510            let scratch_buffer = &mut scratch_buffer[..pages_to_read];
511            AlignedPage::slice_mut_to_repr(scratch_buffer)
512                .as_flattened_mut()
513                .copy_from_slice(bytes_to_write);
514            self.write_all_at_raw(scratch_buffer, offset)?;
515        } else {
516            let scratch_buffer = &mut scratch_buffer[..pages_to_read];
517            // Read whole pages where `bytes_to_write` will be written
518            self.read_exact_at_raw(
519                AlignedPage::as_uninit_slice_mut(scratch_buffer),
520                page_aligned_offset,
521            )?;
522            // Update the contents of existing pages and write into the file
523            AlignedPage::slice_mut_to_repr(scratch_buffer).as_flattened_mut()[padding..]
524                [..bytes_to_write.len()]
525                .copy_from_slice(bytes_to_write);
526            self.write_all_at_raw(scratch_buffer, page_aligned_offset)?;
527        }
528
529        Ok(())
530    }
531}