Skip to main content

ab_data_retrieval/
piece_getter.rs

1//! Getting object pieces from the network or various caches
2
3use ab_archiving::archiver::NewArchivedSegment;
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_core_primitives::segments::SegmentIndex;
6use async_trait::async_trait;
7use futures::{Stream, StreamExt, stream};
8use std::fmt;
9use std::future::Future;
10use std::sync::Arc;
11
12/// Trait representing a way to get pieces
13#[async_trait]
14pub trait PieceGetter: fmt::Debug {
15    /// Get piece by index.
16    ///
17    /// Returns `Ok(None)` if the piece is not found.
18    /// Returns `Err(_)` if trying to get the piece caused an error.
19    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
20
21    /// Get pieces with provided indices.
22    ///
23    /// The number of elements in the returned stream is the same as the number of unique
24    /// `piece_indices`.
25    async fn get_pieces<'a>(
26        &'a self,
27        piece_indices: Vec<PieceIndex>,
28    ) -> anyhow::Result<
29        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
30    >;
31}
32
33#[async_trait]
34impl<T> PieceGetter for Arc<T>
35where
36    T: PieceGetter + Send + Sync + ?Sized,
37{
38    #[inline]
39    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
40        self.as_ref().get_piece(piece_index).await
41    }
42
43    #[inline]
44    async fn get_pieces<'a>(
45        &'a self,
46        piece_indices: Vec<PieceIndex>,
47    ) -> anyhow::Result<
48        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
49    > {
50        self.as_ref().get_pieces(piece_indices).await
51    }
52}
53
54// Convenience methods, mainly used in testing
55#[async_trait]
56impl PieceGetter for NewArchivedSegment {
57    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
58        if piece_index.segment_index()
59            == SegmentIndex::new(self.segment_header.local_segment_index().as_u64())
60        {
61            return Ok(Some(
62                self.pieces
63                    .pieces()
64                    .nth(usize::from(piece_index.position()))
65                    .expect("Piece position always exists in a segment; qed"),
66            ));
67        }
68
69        Ok(None)
70    }
71
72    async fn get_pieces<'a>(
73        &'a self,
74        piece_indices: Vec<PieceIndex>,
75    ) -> anyhow::Result<
76        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
77    > {
78        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
79    }
80}
81
82#[async_trait]
83impl PieceGetter for (PieceIndex, Piece) {
84    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
85        if self.0 == piece_index {
86            return Ok(Some(self.1.clone()));
87        }
88
89        Ok(None)
90    }
91
92    async fn get_pieces<'a>(
93        &'a self,
94        piece_indices: Vec<PieceIndex>,
95    ) -> anyhow::Result<
96        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
97    > {
98        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
99    }
100}
101
102#[async_trait]
103impl PieceGetter for Vec<(PieceIndex, Piece)> {
104    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
105        Ok(self.iter().find_map(|(index, piece)| {
106            if *index == piece_index {
107                Some(piece.clone())
108            } else {
109                None
110            }
111        }))
112    }
113
114    async fn get_pieces<'a>(
115        &'a self,
116        piece_indices: Vec<PieceIndex>,
117    ) -> anyhow::Result<
118        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
119    > {
120        get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
121    }
122}
123
124/// A default implementation which gets each piece individually, using the `get_piece` async
125/// function.
126///
127/// This is mainly used for testing, most production implementations can fetch multiple pieces more
128/// efficiently.
129#[expect(clippy::type_complexity, reason = "type matches trait signature")]
130pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>(
131    // TODO: replace with AsyncFn(PieceIndex) -> anyhow::Result<Option<Piece>> once it stabilises
132    // https://github.com/rust-lang/rust/issues/62290
133    get_piece: Func,
134    piece_indices: PieceIndices,
135) -> anyhow::Result<
136    Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
137>
138where
139    PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
140    Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a,
141    Fut: Future<Output = anyhow::Result<Option<Piece>>> + Send + Unpin + 'a,
142{
143    Ok(Box::new(Box::pin(stream::iter(piece_indices).then(
144        move |piece_index| {
145            let get_piece = get_piece.clone();
146            async move { (piece_index, get_piece(piece_index).await) }
147        },
148    ))))
149}