ab_data_retrieval/
piece_getter.rs1use ab_archiving::archiver::NewArchivedSegment;
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use async_trait::async_trait;
6use futures::{Stream, StreamExt, stream};
7use std::fmt;
8use std::future::Future;
9use std::sync::Arc;
10
11#[async_trait]
13pub trait PieceGetter: fmt::Debug {
14 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
19
20 async fn get_pieces<'a>(
25 &'a self,
26 piece_indices: Vec<PieceIndex>,
27 ) -> anyhow::Result<
28 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
29 >;
30}
31
32#[async_trait]
33impl<T> PieceGetter for Arc<T>
34where
35 T: PieceGetter + Send + Sync + ?Sized,
36{
37 #[inline]
38 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
39 self.as_ref().get_piece(piece_index).await
40 }
41
42 #[inline]
43 async fn get_pieces<'a>(
44 &'a self,
45 piece_indices: Vec<PieceIndex>,
46 ) -> anyhow::Result<
47 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
48 > {
49 self.as_ref().get_pieces(piece_indices).await
50 }
51}
52
53#[async_trait]
55impl PieceGetter for NewArchivedSegment {
56 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
57 if u64::from(piece_index.segment_index()) == u64::from(self.segment_header.index.as_inner())
58 {
59 return Ok(Some(
60 self.pieces
61 .pieces()
62 .nth(usize::from(piece_index.position()))
63 .expect("Piece position always exists in a segment; qed"),
64 ));
65 }
66
67 Ok(None)
68 }
69
70 async fn get_pieces<'a>(
71 &'a self,
72 piece_indices: Vec<PieceIndex>,
73 ) -> anyhow::Result<
74 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
75 > {
76 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
77 }
78}
79
80#[async_trait]
81impl PieceGetter for (PieceIndex, Piece) {
82 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
83 if self.0 == piece_index {
84 return Ok(Some(self.1.clone()));
85 }
86
87 Ok(None)
88 }
89
90 async fn get_pieces<'a>(
91 &'a self,
92 piece_indices: Vec<PieceIndex>,
93 ) -> anyhow::Result<
94 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
95 > {
96 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
97 }
98}
99
100#[async_trait]
101impl PieceGetter for Vec<(PieceIndex, Piece)> {
102 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
103 Ok(self.iter().find_map(|(index, piece)| {
104 if *index == piece_index {
105 Some(piece.clone())
106 } else {
107 None
108 }
109 }))
110 }
111
112 async fn get_pieces<'a>(
113 &'a self,
114 piece_indices: Vec<PieceIndex>,
115 ) -> anyhow::Result<
116 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
117 > {
118 get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
119 }
120}
121
122#[expect(clippy::type_complexity, reason = "type matches trait signature")]
128pub fn get_pieces_individually<'a, PieceIndices, Func, Fut>(
129 get_piece: Func,
132 piece_indices: PieceIndices,
133) -> anyhow::Result<
134 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
135>
136where
137 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
138 Func: Fn(PieceIndex) -> Fut + Clone + Send + 'a,
139 Fut: Future<Output = anyhow::Result<Option<Piece>>> + Send + Unpin + 'a,
140{
141 Ok(Box::new(Box::pin(stream::iter(piece_indices).then(
142 move |piece_index| {
143 let get_piece = get_piece.clone();
144 async move { (piece_index, get_piece(piece_index).await) }
145 },
146 ))))
147}