ab_farmer/single_disk_farm/
piece_reader.rs1use crate::farm::{FarmError, PieceReader};
4use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
5use ab_core_primitives::hashes::Blake3Hash;
6use ab_core_primitives::pieces::{Piece, PieceOffset};
7use ab_core_primitives::sectors::{SectorId, SectorIndex};
8use ab_erasure_coding::ErasureCoding;
9use ab_farmer_components::sector::{SectorMetadataChecksummed, sector_size};
10use ab_farmer_components::{ReadAt, ReadAtAsync, ReadAtSync, reading};
11use ab_proof_of_space::Table;
12use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
13use async_trait::async_trait;
14use futures::channel::{mpsc, oneshot};
15use futures::{SinkExt, StreamExt};
16use std::collections::HashSet;
17use std::future::Future;
18use std::sync::Arc;
19use tracing::{error, warn};
20
21#[derive(Debug)]
22struct ReadPieceRequest {
23 sector_index: SectorIndex,
24 piece_offset: PieceOffset,
25 response_sender: oneshot::Sender<Option<Piece>>,
26}
27
28#[derive(Debug, Clone)]
30pub struct DiskPieceReader {
31 read_piece_sender: mpsc::Sender<ReadPieceRequest>,
32}
33
34#[async_trait]
35impl PieceReader for DiskPieceReader {
36 #[inline]
37 async fn read_piece(
38 &self,
39 sector_index: SectorIndex,
40 piece_offset: PieceOffset,
41 ) -> Result<Option<Piece>, FarmError> {
42 Ok(self.read_piece(sector_index, piece_offset).await)
43 }
44}
45
46impl DiskPieceReader {
47 #[allow(clippy::too_many_arguments)]
52 pub(super) fn new<PosTable>(
53 public_key_hash: Blake3Hash,
54 pieces_in_sector: u16,
55 plot_file: Arc<DirectIoFileWrapper>,
56 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
57 erasure_coding: ErasureCoding,
58 sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
59 global_mutex: Arc<AsyncMutex<()>>,
60 ) -> (Self, impl Future<Output = ()>)
61 where
62 PosTable: Table,
63 {
64 let (read_piece_sender, read_piece_receiver) = mpsc::channel(10);
65
66 let reading_fut = async move {
67 read_pieces::<PosTable, _>(
68 public_key_hash,
69 pieces_in_sector,
70 &*plot_file,
71 sectors_metadata,
72 erasure_coding,
73 sectors_being_modified,
74 read_piece_receiver,
75 global_mutex,
76 )
77 .await
78 };
79
80 (Self { read_piece_sender }, reading_fut)
81 }
82
83 pub(super) fn close_all_readers(&mut self) {
84 self.read_piece_sender.close_channel();
85 }
86
87 pub async fn read_piece(
90 &self,
91 sector_index: SectorIndex,
92 piece_offset: PieceOffset,
93 ) -> Option<Piece> {
94 let (response_sender, response_receiver) = oneshot::channel();
95 self.read_piece_sender
96 .clone()
97 .send(ReadPieceRequest {
98 sector_index,
99 piece_offset,
100 response_sender,
101 })
102 .await
103 .ok()?;
104 response_receiver.await.ok()?
105 }
106}
107
108#[allow(clippy::too_many_arguments)]
109async fn read_pieces<PosTable, S>(
110 public_key_hash: Blake3Hash,
111 pieces_in_sector: u16,
112 plot_file: S,
113 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
114 erasure_coding: ErasureCoding,
115 sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
116 mut read_piece_receiver: mpsc::Receiver<ReadPieceRequest>,
117 global_mutex: Arc<AsyncMutex<()>>,
118) where
119 PosTable: Table,
120 S: ReadAtSync,
121{
122 let table_generator = PosTable::generator();
124
125 while let Some(read_piece_request) = read_piece_receiver.next().await {
126 let ReadPieceRequest {
127 sector_index,
128 piece_offset,
129 response_sender,
130 } = read_piece_request;
131
132 if response_sender.is_canceled() {
133 continue;
134 }
135
136 let sectors_being_modified = &*sectors_being_modified.read().await;
137
138 if sectors_being_modified.contains(§or_index) {
139 continue;
141 }
142
143 let (sector_metadata, sector_count) = {
144 let sectors_metadata = sectors_metadata.read().await;
145
146 let sector_count = sectors_metadata.len() as u16;
147
148 let sector_metadata = match sectors_metadata.get(usize::from(sector_index)) {
149 Some(sector_metadata) => sector_metadata.clone(),
150 None => {
151 error!(
152 %sector_index,
153 %sector_count,
154 "Tried to read piece from sector that is not yet plotted"
155 );
156 continue;
157 }
158 };
159
160 (sector_metadata, sector_count)
161 };
162
163 if u16::from(sector_index) >= sector_count {
165 warn!(
166 %sector_index,
167 %piece_offset,
168 %sector_count,
169 "Incorrect sector offset"
170 );
171 let _ = response_sender.send(None);
173 continue;
174 }
175 if u16::from(piece_offset) >= pieces_in_sector {
177 warn!(
178 %sector_index,
179 %piece_offset,
180 %sector_count,
181 "Incorrect piece offset"
182 );
183 let _ = response_sender.send(None);
185 continue;
186 }
187
188 let sector_size = sector_size(pieces_in_sector);
189 let sector = plot_file.offset(u64::from(sector_index) * sector_size as u64);
190
191 global_mutex.lock().await;
193
194 let maybe_piece = read_piece::<PosTable, _, _>(
195 &public_key_hash,
196 piece_offset,
197 §or_metadata,
198 &ReadAt::from_sync(§or),
200 &erasure_coding,
201 &table_generator,
202 )
203 .await;
204
205 let _ = response_sender.send(maybe_piece);
207 }
208}
209
210async fn read_piece<PosTable, S, A>(
211 public_key_hash: &Blake3Hash,
212 piece_offset: PieceOffset,
213 sector_metadata: &SectorMetadataChecksummed,
214 sector: &ReadAt<S, A>,
215 erasure_coding: &ErasureCoding,
216 table_generator: &PosTable::Generator,
217) -> Option<Piece>
218where
219 PosTable: Table,
220 S: ReadAtSync,
221 A: ReadAtAsync,
222{
223 let sector_index = sector_metadata.sector_index;
224
225 let sector_id = SectorId::new(public_key_hash, sector_index, sector_metadata.history_size);
226
227 let piece = match reading::read_piece::<PosTable, _, _>(
228 piece_offset,
229 §or_id,
230 sector_metadata,
231 sector,
232 erasure_coding,
233 table_generator,
234 )
235 .await
236 {
237 Ok(piece) => piece,
238 Err(error) => {
239 error!(
240 %sector_index,
241 %piece_offset,
242 %error,
243 "Failed to read piece from sector"
244 );
245 return None;
246 }
247 };
248
249 Some(piece)
250}