ab_data_retrieval/
piece_getter.rs1use ab_archiving::archiver::NewArchivedSegment;
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_core_primitives::segments::SegmentIndex;
6use async_trait::async_trait;
7use futures::{Stream, StreamExt, stream};
8use std::fmt;
9use std::future::Future;
10use std::sync::Arc;
11
12#[async_trait]
14pub trait PieceGetter: fmt::Debug {
15 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
20
21 async fn get_pieces<'a>(
26 &'a self,
27 piece_indices: Vec<PieceIndex>,
28 ) -> anyhow::Result<
29 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
30 >;
31}
32
33#[async_trait]
34impl<T> PieceGetter for Arc<T>
35where
36 T: PieceGetter + Send + Sync + ?Sized,
37{
38 #[inline]
39 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
40 self.as_ref().get_piece(piece_index).await
41 }
42
43 #[inline]
44 async fn get_pieces<'a>(
45 &'a self,
46 piece_indices: Vec<PieceIndex>,
47 ) -> anyhow::Result<
48 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
49 > {
50 self.as_ref().get_pieces(piece_indices).await
51 }
52}
53
54#[async_trait]
56impl PieceGetter for NewArchivedSegment {
57 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
58 if piece_index.segment_index()
59 == SegmentIndex::new(self.segment_header.local_segment_index().as_u64())
60 {
61 return Ok(Some(
62 self.pieces
63 .pieces()
64 .nth(usize::from(piece_index.position()))
65 .expect("Piece position always exists in a segment; qed"),
66 ));
67 }
68
69 Ok(None)
70 }
71
72 async fn get_pieces<'a>(
73 &'a self,
74 piece_indices: Vec<PieceIndex>,
75 ) -> anyhow::Result<
76 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
77 > {
78 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
79 }
80}
81
82#[async_trait]
83impl PieceGetter for (PieceIndex, Piece) {
84 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
85 if self.0 == piece_index {
86 return Ok(Some(self.1.clone()));
87 }
88
89 Ok(None)
90 }
91
92 async fn get_pieces<'a>(
93 &'a self,
94 piece_indices: Vec<PieceIndex>,
95 ) -> anyhow::Result<
96 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
97 > {
98 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
99 }
100}
101
102#[async_trait]
103impl PieceGetter for Vec<(PieceIndex, Piece)> {
104 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
105 Ok(self.iter().find_map(|(index, piece)| {
106 if *index == piece_index {
107 Some(piece.clone())
108 } else {
109 None
110 }
111 }))
112 }
113
114 async fn get_pieces<'a>(
115 &'a self,
116 piece_indices: Vec<PieceIndex>,
117 ) -> anyhow::Result<
118 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
119 > {
120 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
121 }
122}
123
124#[expect(clippy::type_complexity, reason = "type matches trait signature")]
130pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>(
131 get_piece: Func,
134 piece_indices: PieceIndices,
135) -> anyhow::Result<
136 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
137>
138where
139 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
140 Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a,
141 Fut: Future<Output = anyhow::Result<Option<Piece>>> + Send + Unpin + 'a,
142{
143 Ok(Box::new(Box::pin(stream::iter(piece_indices).then(
144 move |piece_index| {
145 let get_piece = get_piece.clone();
146 async move { (piece_index, get_piece(piece_index).await) }
147 },
148 ))))
149}