Skip to main content

ab_data_retrieval/
piece_getter.rs

1//! Getting object pieces from the network or various caches
2
3use ab_archiving::archiver::NewArchivedSegment;
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use async_trait::async_trait;
6use futures::{Stream, StreamExt, stream};
7use std::fmt;
8use std::future::Future;
9use std::sync::Arc;
10
11/// Trait representing a way to get pieces
12#[async_trait]
13pub trait PieceGetter: fmt::Debug {
14    /// Get piece by index.
15    ///
16    /// Returns `Ok(None)` if the piece is not found.
17    /// Returns `Err(_)` if trying to get the piece caused an error.
18    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
19
20    /// Get pieces with provided indices.
21    ///
22    /// The number of elements in the returned stream is the same as the number of unique
23    /// `piece_indices`.
24    async fn get_pieces<'a>(
25        &'a self,
26        piece_indices: Vec<PieceIndex>,
27    ) -> anyhow::Result<
28        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
29    >;
30}
31
32#[async_trait]
33impl<T> PieceGetter for Arc<T>
34where
35    T: PieceGetter + Send + Sync + ?Sized,
36{
37    #[inline]
38    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
39        self.as_ref().get_piece(piece_index).await
40    }
41
42    #[inline]
43    async fn get_pieces<'a>(
44        &'a self,
45        piece_indices: Vec<PieceIndex>,
46    ) -> anyhow::Result<
47        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
48    > {
49        self.as_ref().get_pieces(piece_indices).await
50    }
51}
52
53// Convenience methods, mainly used in testing
54#[async_trait]
55impl PieceGetter for NewArchivedSegment {
56    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
57        if u64::from(piece_index.segment_index()) == u64::from(self.segment_header.index.as_inner())
58        {
59            return Ok(Some(
60                self.pieces
61                    .pieces()
62                    .nth(usize::from(piece_index.position()))
63                    .expect("Piece position always exists in a segment; qed"),
64            ));
65        }
66
67        Ok(None)
68    }
69
70    async fn get_pieces<'a>(
71        &'a self,
72        piece_indices: Vec<PieceIndex>,
73    ) -> anyhow::Result<
74        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
75    > {
76        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
77    }
78}
79
80#[async_trait]
81impl PieceGetter for (PieceIndex, Piece) {
82    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
83        if self.0 == piece_index {
84            return Ok(Some(self.1.clone()));
85        }
86
87        Ok(None)
88    }
89
90    async fn get_pieces<'a>(
91        &'a self,
92        piece_indices: Vec<PieceIndex>,
93    ) -> anyhow::Result<
94        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
95    > {
96        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
97    }
98}
99
100#[async_trait]
101impl PieceGetter for Vec<(PieceIndex, Piece)> {
102    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
103        Ok(self.iter().find_map(|(index, piece)| {
104            if *index == piece_index {
105                Some(piece.clone())
106            } else {
107                None
108            }
109        }))
110    }
111
112    async fn get_pieces<'a>(
113        &'a self,
114        piece_indices: Vec<PieceIndex>,
115    ) -> anyhow::Result<
116        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
117    > {
118        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
119    }
120}
121
122/// A default implementation which gets each piece individually, using the `get_piece` async
123/// function.
124///
125/// This is mainly used for testing, most production implementations can fetch multiple pieces more
126/// efficiently.
127#[expect(clippy::type_complexity, reason = "type matches trait signature")]
128pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>(
129    // TODO: replace with AsyncFn(PieceIndex) -> anyhow::Result<Option<Piece>> once it stabilises
130    // https://github.com/rust-lang/rust/issues/62290
131    get_piece: Func,
132    piece_indices: PieceIndices,
133) -> anyhow::Result<
134    Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
135>
136where
137    PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
138    Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a,
139    Fut: Future<Output = anyhow::Result<Option<Piece>>> + Send + Unpin + 'a,
140{
141    Ok(Box::new(Box::pin(stream::iter(piece_indices).then(
142        move |piece_index| {
143            let get_piece = get_piece.clone();
144            async move { (piece_index, get_piece(piece_index).await) }
145        },
146    ))))
147}