ab_farmer/single_disk_farm/
piece_reader.rs

1//! Piece reader for single disk farm
2
3use crate::farm::{FarmError, PieceReader};
4use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
5use ab_core_primitives::hashes::Blake3Hash;
6use ab_core_primitives::pieces::{Piece, PieceOffset};
7use ab_core_primitives::sectors::{SectorId, SectorIndex};
8use ab_erasure_coding::ErasureCoding;
9use ab_farmer_components::sector::{SectorMetadataChecksummed, sector_size};
10use ab_farmer_components::{ReadAt, ReadAtAsync, ReadAtSync, reading};
11use ab_proof_of_space::Table;
12use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
13use async_trait::async_trait;
14use futures::channel::{mpsc, oneshot};
15use futures::{SinkExt, StreamExt};
16use std::collections::HashSet;
17use std::future::Future;
18use std::sync::Arc;
19use tracing::{error, warn};
20
21#[derive(Debug)]
22struct ReadPieceRequest {
23    sector_index: SectorIndex,
24    piece_offset: PieceOffset,
25    response_sender: oneshot::Sender<Option<Piece>>,
26}
27
28/// Wrapper data structure that can be used to read pieces from single disk farm
29#[derive(Debug, Clone)]
30pub struct DiskPieceReader {
31    read_piece_sender: mpsc::Sender<ReadPieceRequest>,
32}
33
34#[async_trait]
35impl PieceReader for DiskPieceReader {
36    #[inline]
37    async fn read_piece(
38        &self,
39        sector_index: SectorIndex,
40        piece_offset: PieceOffset,
41    ) -> Result<Option<Piece>, FarmError> {
42        Ok(self.read_piece(sector_index, piece_offset).await)
43    }
44}
45
46impl DiskPieceReader {
47    /// Creates new piece reader instance and background future that handles reads internally.
48    ///
49    /// NOTE: Background future is async, but does blocking operations and should be running in
50    /// dedicated thread.
51    #[allow(clippy::too_many_arguments)]
52    pub(super) fn new<PosTable>(
53        public_key_hash: Blake3Hash,
54        pieces_in_sector: u16,
55        plot_file: Arc<DirectIoFileWrapper>,
56        sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
57        erasure_coding: ErasureCoding,
58        sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
59        global_mutex: Arc<AsyncMutex<()>>,
60    ) -> (Self, impl Future<Output = ()>)
61    where
62        PosTable: Table,
63    {
64        let (read_piece_sender, read_piece_receiver) = mpsc::channel(10);
65
66        let reading_fut = async move {
67            read_pieces::<PosTable, _>(
68                public_key_hash,
69                pieces_in_sector,
70                &*plot_file,
71                sectors_metadata,
72                erasure_coding,
73                sectors_being_modified,
74                read_piece_receiver,
75                global_mutex,
76            )
77            .await
78        };
79
80        (Self { read_piece_sender }, reading_fut)
81    }
82
83    pub(super) fn close_all_readers(&mut self) {
84        self.read_piece_sender.close_channel();
85    }
86
87    /// Read piece from sector by offset, `None` means input parameters are incorrect or piece
88    /// reader was shut down
89    pub async fn read_piece(
90        &self,
91        sector_index: SectorIndex,
92        piece_offset: PieceOffset,
93    ) -> Option<Piece> {
94        let (response_sender, response_receiver) = oneshot::channel();
95        self.read_piece_sender
96            .clone()
97            .send(ReadPieceRequest {
98                sector_index,
99                piece_offset,
100                response_sender,
101            })
102            .await
103            .ok()?;
104        response_receiver.await.ok()?
105    }
106}
107
108#[allow(clippy::too_many_arguments)]
109async fn read_pieces<PosTable, S>(
110    public_key_hash: Blake3Hash,
111    pieces_in_sector: u16,
112    plot_file: S,
113    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
114    erasure_coding: ErasureCoding,
115    sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
116    mut read_piece_receiver: mpsc::Receiver<ReadPieceRequest>,
117    global_mutex: Arc<AsyncMutex<()>>,
118) where
119    PosTable: Table,
120    S: ReadAtSync,
121{
122    // TODO: Reuse global table generator (this comment is in many files)
123    let table_generator = PosTable::generator();
124
125    while let Some(read_piece_request) = read_piece_receiver.next().await {
126        let ReadPieceRequest {
127            sector_index,
128            piece_offset,
129            response_sender,
130        } = read_piece_request;
131
132        if response_sender.is_canceled() {
133            continue;
134        }
135
136        let sectors_being_modified = &*sectors_being_modified.read().await;
137
138        if sectors_being_modified.contains(&sector_index) {
139            // Skip sector that is being modified right now
140            continue;
141        }
142
143        let (sector_metadata, sector_count) = {
144            let sectors_metadata = sectors_metadata.read().await;
145
146            let sector_count = sectors_metadata.len() as u16;
147
148            let sector_metadata = match sectors_metadata.get(usize::from(sector_index)) {
149                Some(sector_metadata) => sector_metadata.clone(),
150                None => {
151                    error!(
152                        %sector_index,
153                        %sector_count,
154                        "Tried to read piece from sector that is not yet plotted"
155                    );
156                    continue;
157                }
158            };
159
160            (sector_metadata, sector_count)
161        };
162
163        // Sector must be plotted
164        if u16::from(sector_index) >= sector_count {
165            warn!(
166                %sector_index,
167                %piece_offset,
168                %sector_count,
169                "Incorrect sector offset"
170            );
171            // Doesn't matter if receiver still cares about it
172            let _ = response_sender.send(None);
173            continue;
174        }
175        // Piece must be within sector
176        if u16::from(piece_offset) >= pieces_in_sector {
177            warn!(
178                %sector_index,
179                %piece_offset,
180                %sector_count,
181                "Incorrect piece offset"
182            );
183            // Doesn't matter if receiver still cares about it
184            let _ = response_sender.send(None);
185            continue;
186        }
187
188        let sector_size = sector_size(pieces_in_sector);
189        let sector = plot_file.offset(u64::from(sector_index) * sector_size as u64);
190
191        // Take mutex briefly to make sure piece reading is allowed right now
192        global_mutex.lock().await;
193
194        let maybe_piece = read_piece::<PosTable, _, _>(
195            &public_key_hash,
196            piece_offset,
197            &sector_metadata,
198            // TODO: Async
199            &ReadAt::from_sync(&sector),
200            &erasure_coding,
201            &table_generator,
202        )
203        .await;
204
205        // Doesn't matter if receiver still cares about it
206        let _ = response_sender.send(maybe_piece);
207    }
208}
209
210async fn read_piece<PosTable, S, A>(
211    public_key_hash: &Blake3Hash,
212    piece_offset: PieceOffset,
213    sector_metadata: &SectorMetadataChecksummed,
214    sector: &ReadAt<S, A>,
215    erasure_coding: &ErasureCoding,
216    table_generator: &PosTable::Generator,
217) -> Option<Piece>
218where
219    PosTable: Table,
220    S: ReadAtSync,
221    A: ReadAtAsync,
222{
223    let sector_index = sector_metadata.sector_index;
224
225    let sector_id = SectorId::new(public_key_hash, sector_index, sector_metadata.history_size);
226
227    let piece = match reading::read_piece::<PosTable, _, _>(
228        piece_offset,
229        &sector_id,
230        sector_metadata,
231        sector,
232        erasure_coding,
233        table_generator,
234    )
235    .await
236    {
237        Ok(piece) => piece,
238        Err(error) => {
239            error!(
240                %sector_index,
241                %piece_offset,
242                %error,
243                "Failed to read piece from sector"
244            );
245            return None;
246        }
247    };
248
249    Some(piece)
250}