ab_farmer/single_disk_farm/
piece_reader.rs

1//! Piece reader for single disk farm
2
3use crate::farm::{FarmError, PieceReader};
4use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
5use ab_core_primitives::hashes::Blake3Hash;
6use ab_core_primitives::pieces::{Piece, PieceOffset};
7use ab_core_primitives::sectors::{SectorId, SectorIndex};
8use ab_core_primitives::solutions::ShardCommitmentHash;
9use ab_erasure_coding::ErasureCoding;
10use ab_farmer_components::sector::{SectorMetadataChecksummed, sector_size};
11use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
12use ab_farmer_components::{ReadAt, ReadAtAsync, ReadAtSync, reading};
13use ab_proof_of_space::Table;
14use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
15use async_trait::async_trait;
16use futures::channel::{mpsc, oneshot};
17use futures::{SinkExt, StreamExt};
18use std::collections::HashSet;
19use std::future::Future;
20use std::sync::Arc;
21use tracing::{error, warn};
22
23#[derive(Debug)]
24struct ReadPieceRequest {
25    sector_index: SectorIndex,
26    piece_offset: PieceOffset,
27    response_sender: oneshot::Sender<Option<Piece>>,
28}
29
30/// Wrapper data structure that can be used to read pieces from single disk farm
31#[derive(Debug, Clone)]
32pub struct DiskPieceReader {
33    read_piece_sender: mpsc::Sender<ReadPieceRequest>,
34}
35
36#[async_trait]
37impl PieceReader for DiskPieceReader {
38    #[inline]
39    async fn read_piece(
40        &self,
41        sector_index: SectorIndex,
42        piece_offset: PieceOffset,
43    ) -> Result<Option<Piece>, FarmError> {
44        Ok(self.read_piece(sector_index, piece_offset).await)
45    }
46}
47
48impl DiskPieceReader {
49    /// Creates new piece reader instance and background future that handles reads internally.
50    ///
51    /// NOTE: Background future is async, but does blocking operations and should be running in
52    /// dedicated thread.
53    #[expect(clippy::too_many_arguments)]
54    pub(super) fn new<PosTable>(
55        public_key_hash: Blake3Hash,
56        shard_commitments_roots_cache: ShardCommitmentsRootsCache,
57        pieces_in_sector: u16,
58        plot_file: Arc<DirectIoFileWrapper>,
59        sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
60        erasure_coding: ErasureCoding,
61        sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
62        global_mutex: Arc<AsyncMutex<()>>,
63    ) -> (Self, impl Future<Output = ()>)
64    where
65        PosTable: Table,
66    {
67        let (read_piece_sender, read_piece_receiver) = mpsc::channel(10);
68
69        let reading_fut = async move {
70            read_pieces::<PosTable, _>(
71                public_key_hash,
72                shard_commitments_roots_cache,
73                pieces_in_sector,
74                &*plot_file,
75                sectors_metadata,
76                erasure_coding,
77                sectors_being_modified,
78                read_piece_receiver,
79                global_mutex,
80            )
81            .await
82        };
83
84        (Self { read_piece_sender }, reading_fut)
85    }
86
87    pub(super) fn close_all_readers(&mut self) {
88        self.read_piece_sender.close_channel();
89    }
90
91    /// Read piece from sector by offset, `None` means input parameters are incorrect or piece
92    /// reader was shut down
93    pub async fn read_piece(
94        &self,
95        sector_index: SectorIndex,
96        piece_offset: PieceOffset,
97    ) -> Option<Piece> {
98        let (response_sender, response_receiver) = oneshot::channel();
99        self.read_piece_sender
100            .clone()
101            .send(ReadPieceRequest {
102                sector_index,
103                piece_offset,
104                response_sender,
105            })
106            .await
107            .ok()?;
108        response_receiver.await.ok()?
109    }
110}
111
112#[expect(clippy::too_many_arguments)]
113async fn read_pieces<PosTable, S>(
114    public_key_hash: Blake3Hash,
115    shard_commitments_roots_cache: ShardCommitmentsRootsCache,
116    pieces_in_sector: u16,
117    plot_file: S,
118    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
119    erasure_coding: ErasureCoding,
120    sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
121    mut read_piece_receiver: mpsc::Receiver<ReadPieceRequest>,
122    global_mutex: Arc<AsyncMutex<()>>,
123) where
124    PosTable: Table,
125    S: ReadAtSync,
126{
127    // TODO: Reuse global table generator (this comment is in many files)
128    let table_generator = PosTable::generator();
129
130    while let Some(read_piece_request) = read_piece_receiver.next().await {
131        let ReadPieceRequest {
132            sector_index,
133            piece_offset,
134            response_sender,
135        } = read_piece_request;
136
137        if response_sender.is_canceled() {
138            continue;
139        }
140
141        let sectors_being_modified = &*sectors_being_modified.read().await;
142
143        if sectors_being_modified.contains(&sector_index) {
144            // Skip sector that is being modified right now
145            continue;
146        }
147
148        let (sector_metadata, sector_count) = {
149            let sectors_metadata = sectors_metadata.read().await;
150
151            let sector_count = sectors_metadata.len() as u16;
152
153            let sector_metadata = match sectors_metadata.get(usize::from(sector_index)) {
154                Some(sector_metadata) => sector_metadata.clone(),
155                None => {
156                    error!(
157                        %sector_index,
158                        %sector_count,
159                        "Tried to read piece from sector that is not yet plotted"
160                    );
161                    continue;
162                }
163            };
164
165            (sector_metadata, sector_count)
166        };
167
168        // Sector must be plotted
169        if u16::from(sector_index) >= sector_count {
170            warn!(
171                %sector_index,
172                %piece_offset,
173                %sector_count,
174                "Incorrect sector offset"
175            );
176            // Doesn't matter if receiver still cares about it
177            let _ = response_sender.send(None);
178            continue;
179        }
180        // Piece must be within sector
181        if u16::from(piece_offset) >= pieces_in_sector {
182            warn!(
183                %sector_index,
184                %piece_offset,
185                %sector_count,
186                "Incorrect piece offset"
187            );
188            // Doesn't matter if receiver still cares about it
189            let _ = response_sender.send(None);
190            continue;
191        }
192
193        let sector_size = sector_size(pieces_in_sector);
194        let sector = plot_file.offset(u64::from(sector_index) * sector_size as u64);
195
196        // Take mutex briefly to make sure piece reading is allowed right now
197        global_mutex.lock().await;
198
199        let maybe_piece = read_piece::<PosTable, _, _>(
200            &public_key_hash,
201            &shard_commitments_roots_cache.get(sector_metadata.history_size),
202            piece_offset,
203            &sector_metadata,
204            // TODO: Async
205            &ReadAt::from_sync(&sector),
206            &erasure_coding,
207            &table_generator,
208        )
209        .await;
210
211        // Doesn't matter if receiver still cares about it
212        let _ = response_sender.send(maybe_piece);
213    }
214}
215
216async fn read_piece<PosTable, S, A>(
217    public_key_hash: &Blake3Hash,
218    shard_commitments_root: &ShardCommitmentHash,
219    piece_offset: PieceOffset,
220    sector_metadata: &SectorMetadataChecksummed,
221    sector: &ReadAt<S, A>,
222    erasure_coding: &ErasureCoding,
223    table_generator: &PosTable::Generator,
224) -> Option<Piece>
225where
226    PosTable: Table,
227    S: ReadAtSync,
228    A: ReadAtAsync,
229{
230    let sector_index = sector_metadata.sector_index;
231
232    let sector_id = SectorId::new(
233        public_key_hash,
234        shard_commitments_root,
235        sector_index,
236        sector_metadata.history_size,
237    );
238
239    let piece = match reading::read_piece::<PosTable, _, _>(
240        piece_offset,
241        &sector_id,
242        sector_metadata,
243        sector,
244        erasure_coding,
245        table_generator,
246    )
247    .await
248    {
249        Ok(piece) => piece,
250        Err(error) => {
251            error!(
252                %sector_index,
253                %piece_offset,
254                %error,
255                "Failed to read piece from sector"
256            );
257            return None;
258        }
259    };
260
261    Some(piece)
262}