ab_data_retrieval/
piece_getter.rs1use ab_archiving::archiver::NewArchivedSegment;
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use async_trait::async_trait;
6use futures::{Stream, StreamExt, stream};
7use std::fmt;
8use std::future::Future;
9use std::sync::Arc;
10
11#[async_trait]
13pub trait PieceGetter: fmt::Debug {
14 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
19
20 async fn get_pieces<'a>(
25 &'a self,
26 piece_indices: Vec<PieceIndex>,
27 ) -> anyhow::Result<
28 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
29 >;
30}
31
32#[async_trait]
33impl<T> PieceGetter for Arc<T>
34where
35 T: PieceGetter + Send + Sync + ?Sized,
36{
37 #[inline]
38 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
39 self.as_ref().get_piece(piece_index).await
40 }
41
42 #[inline]
43 async fn get_pieces<'a>(
44 &'a self,
45 piece_indices: Vec<PieceIndex>,
46 ) -> anyhow::Result<
47 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
48 > {
49 self.as_ref().get_pieces(piece_indices).await
50 }
51}
52
53#[async_trait]
55impl PieceGetter for NewArchivedSegment {
56 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
57 if piece_index.segment_index() == self.segment_header.segment_index() {
58 return Ok(Some(
59 self.pieces
60 .pieces()
61 .nth(piece_index.position() as usize)
62 .expect("Piece position always exists in a segment; qed"),
63 ));
64 }
65
66 Ok(None)
67 }
68
69 async fn get_pieces<'a>(
70 &'a self,
71 piece_indices: Vec<PieceIndex>,
72 ) -> anyhow::Result<
73 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
74 > {
75 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
76 }
77}
78
79#[async_trait]
80impl PieceGetter for (PieceIndex, Piece) {
81 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
82 if self.0 == piece_index {
83 return Ok(Some(self.1.clone()));
84 }
85
86 Ok(None)
87 }
88
89 async fn get_pieces<'a>(
90 &'a self,
91 piece_indices: Vec<PieceIndex>,
92 ) -> anyhow::Result<
93 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
94 > {
95 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
96 }
97}
98
99#[async_trait]
100impl PieceGetter for Vec<(PieceIndex, Piece)> {
101 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
102 Ok(self.iter().find_map(|(index, piece)| {
103 if *index == piece_index {
104 Some(piece.clone())
105 } else {
106 None
107 }
108 }))
109 }
110
111 async fn get_pieces<'a>(
112 &'a self,
113 piece_indices: Vec<PieceIndex>,
114 ) -> anyhow::Result<
115 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
116 > {
117 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
118 }
119}
120
121#[expect(clippy::type_complexity, reason = "type matches trait signature")]
127pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>(
128 get_piece: Func,
131 piece_indices: PieceIndices,
132) -> anyhow::Result<
133 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
134>
135where
136 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
137 Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a,
138 Fut: Future<Output = anyhow::Result<Option<Piece>>> + Send + Unpin + 'a,
139{
140 Ok(Box::new(Box::pin(stream::iter(piece_indices).then(
141 move |piece_index| {
142 let get_piece = get_piece.clone();
143 async move { (piece_index, get_piece(piece_index).await) }
144 },
145 ))))
146}