ab_data_retrieval/
piece_getter.rs

1//! Getting object pieces from the network or various caches
2
3use ab_archiving::archiver::NewArchivedSegment;
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use async_trait::async_trait;
6use futures::{Stream, StreamExt, stream};
7use std::fmt;
8use std::future::Future;
9use std::sync::Arc;
10
11/// Trait representing a way to get pieces
12#[async_trait]
13pub trait PieceGetter: fmt::Debug {
14    /// Get piece by index.
15    ///
16    /// Returns `Ok(None)` if the piece is not found.
17    /// Returns `Err(_)` if trying to get the piece caused an error.
18    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
19
20    /// Get pieces with provided indices.
21    ///
22    /// The number of elements in the returned stream is the same as the number of unique
23    /// `piece_indices`.
24    async fn get_pieces<'a>(
25        &'a self,
26        piece_indices: Vec<PieceIndex>,
27    ) -> anyhow::Result<
28        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
29    >;
30}
31
32#[async_trait]
33impl<T> PieceGetter for Arc<T>
34where
35    T: PieceGetter + Send + Sync + ?Sized,
36{
37    #[inline]
38    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
39        self.as_ref().get_piece(piece_index).await
40    }
41
42    #[inline]
43    async fn get_pieces<'a>(
44        &'a self,
45        piece_indices: Vec<PieceIndex>,
46    ) -> anyhow::Result<
47        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
48    > {
49        self.as_ref().get_pieces(piece_indices).await
50    }
51}
52
53// Convenience methods, mainly used in testing
54#[async_trait]
55impl PieceGetter for NewArchivedSegment {
56    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
57        if piece_index.segment_index() == self.segment_header.segment_index() {
58            return Ok(Some(
59                self.pieces
60                    .pieces()
61                    .nth(piece_index.position() as usize)
62                    .expect("Piece position always exists in a segment; qed"),
63            ));
64        }
65
66        Ok(None)
67    }
68
69    async fn get_pieces<'a>(
70        &'a self,
71        piece_indices: Vec<PieceIndex>,
72    ) -> anyhow::Result<
73        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
74    > {
75        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
76    }
77}
78
79#[async_trait]
80impl PieceGetter for (PieceIndex, Piece) {
81    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
82        if self.0 == piece_index {
83            return Ok(Some(self.1.clone()));
84        }
85
86        Ok(None)
87    }
88
89    async fn get_pieces<'a>(
90        &'a self,
91        piece_indices: Vec<PieceIndex>,
92    ) -> anyhow::Result<
93        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
94    > {
95        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
96    }
97}
98
99#[async_trait]
100impl PieceGetter for Vec<(PieceIndex, Piece)> {
101    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
102        Ok(self.iter().find_map(|(index, piece)| {
103            if *index == piece_index {
104                Some(piece.clone())
105            } else {
106                None
107            }
108        }))
109    }
110
111    async fn get_pieces<'a>(
112        &'a self,
113        piece_indices: Vec<PieceIndex>,
114    ) -> anyhow::Result<
115        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
116    > {
117        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
118    }
119}
120
121/// A default implementation which gets each piece individually, using the `get_piece` async
122/// function.
123///
124/// This is mainly used for testing, most production implementations can fetch multiple pieces more
125/// efficiently.
126#[expect(clippy::type_complexity, reason = "type matches trait signature")]
127pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>(
128    // TODO: replace with AsyncFn(PieceIndex) -> anyhow::Result<Option<Piece>> once it stabilises
129    // https://github.com/rust-lang/rust/issues/62290
130    get_piece: Func,
131    piece_indices: PieceIndices,
132) -> anyhow::Result<
133    Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
134>
135where
136    PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
137    Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a,
138    Fut: Future<Output = anyhow::Result<Option<Piece>>> + Send + Unpin + 'a,
139{
140    Ok(Box::new(Box::pin(stream::iter(piece_indices).then(
141        move |piece_index| {
142            let get_piece = get_piece.clone();
143            async move { (piece_index, get_piece(piece_index).await) }
144        },
145    ))))
146}