ab_farmer/single_disk_farm/
piece_reader.rs1use crate::farm::{FarmError, PieceReader};
4use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
5use ab_core_primitives::hashes::Blake3Hash;
6use ab_core_primitives::pieces::{Piece, PieceOffset};
7use ab_core_primitives::sectors::{SectorId, SectorIndex};
8use ab_core_primitives::solutions::ShardCommitmentHash;
9use ab_erasure_coding::ErasureCoding;
10use ab_farmer_components::sector::{SectorMetadataChecksummed, sector_size};
11use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
12use ab_farmer_components::{ReadAt, ReadAtAsync, ReadAtSync, reading};
13use ab_proof_of_space::Table;
14use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
15use async_trait::async_trait;
16use futures::channel::{mpsc, oneshot};
17use futures::{SinkExt, StreamExt};
18use std::collections::HashSet;
19use std::future::Future;
20use std::sync::Arc;
21use tracing::{error, warn};
22
23#[derive(Debug)]
24struct ReadPieceRequest {
25 sector_index: SectorIndex,
26 piece_offset: PieceOffset,
27 response_sender: oneshot::Sender<Option<Piece>>,
28}
29
30#[derive(Debug, Clone)]
32pub struct DiskPieceReader {
33 read_piece_sender: mpsc::Sender<ReadPieceRequest>,
34}
35
36#[async_trait]
37impl PieceReader for DiskPieceReader {
38 #[inline]
39 async fn read_piece(
40 &self,
41 sector_index: SectorIndex,
42 piece_offset: PieceOffset,
43 ) -> Result<Option<Piece>, FarmError> {
44 Ok(self.read_piece(sector_index, piece_offset).await)
45 }
46}
47
48impl DiskPieceReader {
49 #[expect(clippy::too_many_arguments)]
54 pub(super) fn new<PosTable>(
55 public_key_hash: Blake3Hash,
56 shard_commitments_roots_cache: ShardCommitmentsRootsCache,
57 pieces_in_sector: u16,
58 plot_file: Arc<DirectIoFileWrapper>,
59 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
60 erasure_coding: ErasureCoding,
61 sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
62 global_mutex: Arc<AsyncMutex<()>>,
63 ) -> (Self, impl Future<Output = ()>)
64 where
65 PosTable: Table,
66 {
67 let (read_piece_sender, read_piece_receiver) = mpsc::channel(10);
68
69 let reading_fut = async move {
70 read_pieces::<PosTable, _>(
71 public_key_hash,
72 shard_commitments_roots_cache,
73 pieces_in_sector,
74 &*plot_file,
75 sectors_metadata,
76 erasure_coding,
77 sectors_being_modified,
78 read_piece_receiver,
79 global_mutex,
80 )
81 .await
82 };
83
84 (Self { read_piece_sender }, reading_fut)
85 }
86
87 pub(super) fn close_all_readers(&mut self) {
88 self.read_piece_sender.close_channel();
89 }
90
91 pub async fn read_piece(
94 &self,
95 sector_index: SectorIndex,
96 piece_offset: PieceOffset,
97 ) -> Option<Piece> {
98 let (response_sender, response_receiver) = oneshot::channel();
99 self.read_piece_sender
100 .clone()
101 .send(ReadPieceRequest {
102 sector_index,
103 piece_offset,
104 response_sender,
105 })
106 .await
107 .ok()?;
108 response_receiver.await.ok()?
109 }
110}
111
112#[expect(clippy::too_many_arguments)]
113async fn read_pieces<PosTable, S>(
114 public_key_hash: Blake3Hash,
115 shard_commitments_roots_cache: ShardCommitmentsRootsCache,
116 pieces_in_sector: u16,
117 plot_file: S,
118 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
119 erasure_coding: ErasureCoding,
120 sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
121 mut read_piece_receiver: mpsc::Receiver<ReadPieceRequest>,
122 global_mutex: Arc<AsyncMutex<()>>,
123) where
124 PosTable: Table,
125 S: ReadAtSync,
126{
127 let table_generator = PosTable::generator();
129
130 while let Some(read_piece_request) = read_piece_receiver.next().await {
131 let ReadPieceRequest {
132 sector_index,
133 piece_offset,
134 response_sender,
135 } = read_piece_request;
136
137 if response_sender.is_canceled() {
138 continue;
139 }
140
141 let sectors_being_modified = &*sectors_being_modified.read().await;
142
143 if sectors_being_modified.contains(§or_index) {
144 continue;
146 }
147
148 let (sector_metadata, sector_count) = {
149 let sectors_metadata = sectors_metadata.read().await;
150
151 let sector_count = sectors_metadata.len() as u16;
152
153 let sector_metadata = match sectors_metadata.get(usize::from(sector_index)) {
154 Some(sector_metadata) => sector_metadata.clone(),
155 None => {
156 error!(
157 %sector_index,
158 %sector_count,
159 "Tried to read piece from sector that is not yet plotted"
160 );
161 continue;
162 }
163 };
164
165 (sector_metadata, sector_count)
166 };
167
168 if u16::from(sector_index) >= sector_count {
170 warn!(
171 %sector_index,
172 %piece_offset,
173 %sector_count,
174 "Incorrect sector offset"
175 );
176 let _ = response_sender.send(None);
178 continue;
179 }
180 if u16::from(piece_offset) >= pieces_in_sector {
182 warn!(
183 %sector_index,
184 %piece_offset,
185 %sector_count,
186 "Incorrect piece offset"
187 );
188 let _ = response_sender.send(None);
190 continue;
191 }
192
193 let sector_size = sector_size(pieces_in_sector);
194 let sector = plot_file.offset(u64::from(sector_index) * sector_size as u64);
195
196 global_mutex.lock().await;
198
199 let maybe_piece = read_piece::<PosTable, _, _>(
200 &public_key_hash,
201 &shard_commitments_roots_cache.get(sector_metadata.history_size),
202 piece_offset,
203 §or_metadata,
204 &ReadAt::from_sync(§or),
206 &erasure_coding,
207 &table_generator,
208 )
209 .await;
210
211 let _ = response_sender.send(maybe_piece);
213 }
214}
215
216async fn read_piece<PosTable, S, A>(
217 public_key_hash: &Blake3Hash,
218 shard_commitments_root: &ShardCommitmentHash,
219 piece_offset: PieceOffset,
220 sector_metadata: &SectorMetadataChecksummed,
221 sector: &ReadAt<S, A>,
222 erasure_coding: &ErasureCoding,
223 table_generator: &PosTable::Generator,
224) -> Option<Piece>
225where
226 PosTable: Table,
227 S: ReadAtSync,
228 A: ReadAtAsync,
229{
230 let sector_index = sector_metadata.sector_index;
231
232 let sector_id = SectorId::new(
233 public_key_hash,
234 shard_commitments_root,
235 sector_index,
236 sector_metadata.history_size,
237 );
238
239 let piece = match reading::read_piece::<PosTable, _, _>(
240 piece_offset,
241 §or_id,
242 sector_metadata,
243 sector,
244 erasure_coding,
245 table_generator,
246 )
247 .await
248 {
249 Ok(piece) => piece,
250 Err(error) => {
251 error!(
252 %sector_index,
253 %piece_offset,
254 %error,
255 "Failed to read piece from sector"
256 );
257 return None;
258 }
259 };
260
261 Some(piece)
262}