1use crate::constructor::DummyRecordStore;
4use crate::protocols::request_response::handlers::cached_piece_by_index::{
5 CachedPieceByIndexRequest, CachedPieceByIndexResponse, PieceResult,
6};
7use crate::protocols::request_response::handlers::piece_by_index::{
8 PieceByIndexRequest, PieceByIndexResponse,
9};
10use crate::utils::multihash::ToMultihash;
11use crate::{Multihash, Node};
12use ab_core_primitives::pieces::{Piece, PieceIndex};
13use async_lock::{Semaphore, SemaphoreGuard};
14use async_trait::async_trait;
15use futures::channel::mpsc;
16use futures::future::FusedFuture;
17use futures::stream::FuturesUnordered;
18use futures::task::noop_waker_ref;
19use futures::{FutureExt, Stream, StreamExt, stream};
20use libp2p::kad::{Behaviour as Kademlia, KBucketKey, RecordKey};
21use libp2p::swarm::NetworkBehaviour;
22use libp2p::{Multiaddr, PeerId};
23use rand::prelude::*;
24use std::any::type_name;
25use std::collections::{HashMap, HashSet};
26use std::fmt;
27use std::future::ready;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31use tokio_stream::StreamMap;
32use tracing::{Instrument, debug, trace, warn};
33
34#[async_trait]
36pub trait PieceValidator: Sync + Send {
37 async fn validate_piece(
39 &self,
40 source_peer_id: PeerId,
41 piece_index: PieceIndex,
42 piece: Piece,
43 ) -> Option<Piece>;
44}
45
46#[derive(Debug, Clone, Copy)]
48pub struct NoPieceValidator;
49
50#[async_trait]
51impl PieceValidator for NoPieceValidator {
52 async fn validate_piece(&self, _: PeerId, _: PieceIndex, piece: Piece) -> Option<Piece> {
53 Some(piece)
54 }
55}
56
57#[derive(Clone)]
60pub struct PieceProvider<PV> {
61 node: Node,
62 piece_validator: PV,
63 piece_downloading_semaphore: Arc<Semaphore>,
64}
65
66impl<PV> fmt::Debug for PieceProvider<PV> {
67 #[inline]
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 f.debug_struct(&format!("PieceProvider<{}>", type_name::<PV>()))
70 .finish_non_exhaustive()
71 }
72}
73
74impl<PV> PieceProvider<PV>
75where
76 PV: PieceValidator,
77{
78 pub fn new(
80 node: Node,
81 piece_validator: PV,
82 piece_downloading_semaphore: Arc<Semaphore>,
83 ) -> Self {
84 Self {
85 node,
86 piece_validator,
87 piece_downloading_semaphore,
88 }
89 }
90
91 pub fn get_from_cache<'a, PieceIndices>(
95 &'a self,
96 piece_indices: PieceIndices,
97 ) -> impl Future<Output = impl Stream<Item = (PieceIndex, Option<Piece>)> + Unpin + 'a>
98 where
99 PieceIndices: IntoIterator<Item = PieceIndex> + 'a,
100 {
101 let download_id = rand::random::<u64>();
102 let (tx, mut rx) = mpsc::unbounded();
103 let fut = async move {
104 let not_downloaded_pieces = download_cached_pieces(
105 piece_indices,
106 &self.node,
107 &self.piece_validator,
108 &tx,
109 &self.piece_downloading_semaphore,
110 )
111 .await;
112
113 if not_downloaded_pieces.is_empty() {
114 debug!("Done");
115 return;
116 }
117
118 for piece_index in not_downloaded_pieces {
119 tx.unbounded_send((piece_index, None))
120 .expect("This future isn't polled after receiver is dropped; qed");
121 }
122
123 debug!("Done #2");
124 };
125
126 let mut fut = Box::pin(fut.instrument(tracing::info_span!("", %download_id)).fuse());
127
128 ready(stream::poll_fn(move |cx| {
130 if !fut.is_terminated() {
131 let _: Poll<()> = fut.poll_unpin(cx);
133 }
134
135 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
136 return Poll::Ready(maybe_result);
137 }
138
139 Poll::Pending
141 }))
142 }
143
144 pub async fn get_piece_from_cache(&self, piece_index: PieceIndex) -> Option<Piece> {
146 let key = RecordKey::from(piece_index.to_multihash());
147
148 let request_batch = self.node.get_requests_batch_handle().await;
149 let mut get_providers_stream = request_batch
150 .get_providers(key.clone())
151 .await
152 .inspect_err(|err| warn!(%piece_index,?key, ?err, "get_providers returned an error"))
153 .ok()?;
154
155 let key = hex::encode(&key);
156 while let Some(provider_id) = get_providers_stream.next().await {
157 trace!(%piece_index, key, %provider_id, "get_providers returned an item");
158
159 let Ok(PieceByIndexResponse {
160 piece,
161 cached_pieces: _,
162 }) = request_batch
163 .send_generic_request(
164 provider_id,
165 Vec::new(),
166 PieceByIndexRequest {
167 piece_index,
168 cached_pieces: Arc::default(),
169 },
170 )
171 .await
172 .inspect_err(
173 |error| debug!(%piece_index, key, %provider_id, ?error, "Piece request failed"),
174 )
175 else {
176 continue;
177 };
178
179 if let Some(piece) = piece {
180 trace!(%piece_index, key, %provider_id, "Piece request succeeded");
181
182 return self
183 .piece_validator
184 .validate_piece(provider_id, piece_index, piece)
185 .await;
186 }
187
188 debug!(%piece_index, key, %provider_id, "Piece request returned empty piece");
189 }
190
191 None
192 }
193
194 pub async fn get_piece_from_peer(
196 &self,
197 peer_id: PeerId,
198 piece_index: PieceIndex,
199 ) -> Option<Piece> {
200 let PieceByIndexResponse {
202 piece,
203 cached_pieces: _,
204 } = self
205 .node
206 .send_generic_request(
207 peer_id,
208 Vec::new(),
209 PieceByIndexRequest {
210 piece_index,
211 cached_pieces: Arc::default(),
212 },
213 )
214 .await
215 .inspect_err(|error| debug!(%peer_id, %piece_index, ?error, "Piece request failed"))
216 .ok()?;
217
218 if let Some(piece) = piece {
219 trace!(%peer_id, %piece_index, "Piece request succeeded");
220
221 return self
222 .piece_validator
223 .validate_piece(peer_id, piece_index, piece)
224 .await;
225 }
226
227 debug!(%peer_id, %piece_index, "Piece request returned empty piece");
228
229 None
230 }
231
232 pub async fn get_piece_from_archival_storage(
235 &self,
236 piece_index: PieceIndex,
237 max_random_walking_rounds: usize,
238 ) -> Option<Piece> {
239 trace!(%piece_index, "Getting piece from archival storage..");
241
242 let connected_servers = {
243 let connected_servers = match self.node.connected_servers().await {
244 Ok(connected_servers) => connected_servers,
245 Err(err) => {
246 debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)");
247
248 Default::default()
249 }
250 };
251
252 HashSet::<PeerId>::from_iter(connected_servers)
253 };
254
255 if connected_servers.is_empty() {
256 debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)");
257 } else {
258 for peer_id in connected_servers {
259 let maybe_piece = self.get_piece_from_peer(peer_id, piece_index).await;
260
261 if maybe_piece.is_some() {
262 trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded");
263
264 return maybe_piece;
265 }
266 }
267 }
268
269 trace!(%piece_index, "Getting piece from DSN L1 using random walk.");
270 let random_walk_result = self
271 .get_piece_by_random_walking(piece_index, max_random_walking_rounds)
272 .await;
273
274 if random_walk_result.is_some() {
275 trace!(%piece_index, "DSN L1 lookup via random walk succeeded");
276
277 return random_walk_result;
278 }
279
280 debug!(
281 %piece_index,
282 %max_random_walking_rounds,
283 "Cannot acquire piece from DSN L1: random walk failed"
284 );
285
286 None
287 }
288
289 async fn get_piece_by_random_walking(
291 &self,
292 piece_index: PieceIndex,
293 walking_rounds: usize,
294 ) -> Option<Piece> {
295 for round in 0..walking_rounds {
296 debug!(%piece_index, round, "Random walk round");
297
298 let result = self
299 .get_piece_by_random_walking_from_single_round(piece_index, round)
300 .await;
301
302 if result.is_some() {
303 return result;
304 }
305 }
306
307 debug!(%piece_index, "Random walking piece retrieval failed.");
308
309 None
310 }
311
312 async fn get_piece_by_random_walking_from_single_round(
314 &self,
315 piece_index: PieceIndex,
316 round: usize,
317 ) -> Option<Piece> {
318 trace!(%piece_index, "get_piece_by_random_walking round");
320
321 let key = PeerId::random();
323
324 let request_batch = self.node.get_requests_batch_handle().await;
325 let mut get_closest_peers_stream = request_batch
326 .get_closest_peers(key.into())
327 .await
328 .inspect_err(|err| warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error"))
329 .ok()?;
330
331 while let Some(peer_id) = get_closest_peers_stream.next().await {
332 trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item");
333
334 let Ok(PieceByIndexResponse {
335 piece,
336 cached_pieces: _,
337 }) = request_batch
338 .send_generic_request(
339 peer_id,
340 Vec::new(),
341 PieceByIndexRequest {
342 piece_index,
343 cached_pieces: Arc::default(),
344 },
345 )
346 .await
347 .inspect_err(
348 |error| debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed."),
349 )
350 else {
351 continue;
352 };
353
354 if let Some(piece) = piece {
355 trace!(%peer_id, %piece_index, ?key, %round, "Piece request succeeded.");
356
357 return self
358 .piece_validator
359 .validate_piece(peer_id, piece_index, piece)
360 .await;
361 }
362
363 debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece.");
364 }
365
366 None
367 }
368}
369
370struct KademliaWrapper {
372 local_peer_id: PeerId,
373 kademlia: Kademlia<DummyRecordStore>,
374}
375
376impl KademliaWrapper {
377 fn new(local_peer_id: PeerId) -> Self {
378 Self {
379 local_peer_id,
380 kademlia: Kademlia::new(local_peer_id, DummyRecordStore),
381 }
382 }
383
384 fn add_peer(&mut self, peer_id: &PeerId, addresses: Vec<Multiaddr>) {
385 for address in addresses {
386 self.kademlia.add_address(peer_id, address);
387 }
388 while self
389 .kademlia
390 .poll(&mut Context::from_waker(noop_waker_ref()))
391 .is_ready()
392 {
393 }
395 }
396
397 fn closest_peers(
399 &mut self,
400 key: &KBucketKey<Multihash>,
401 ) -> impl Iterator<Item = (PeerId, Vec<Multiaddr>)> + 'static {
402 let mut closest_peers = self
403 .kademlia
404 .find_closest_local_peers(key, &self.local_peer_id)
405 .map(|peer| {
406 (
407 KBucketKey::from(peer.node_id).distance(key),
408 peer.node_id,
409 peer.multiaddrs,
410 )
411 })
412 .collect::<Vec<_>>();
413
414 closest_peers.sort_unstable_by_key(|a| a.0);
415 closest_peers
416 .into_iter()
417 .map(|(_distance, peer_id, addresses)| (peer_id, addresses))
418 }
419}
420
421async fn download_cached_pieces<PV, PieceIndices>(
424 piece_indices: PieceIndices,
425 node: &Node,
426 piece_validator: &PV,
427 results: &mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
428 semaphore: &Semaphore,
429) -> impl ExactSizeIterator<Item = PieceIndex>
430where
431 PV: PieceValidator,
432 PieceIndices: IntoIterator<Item = PieceIndex>,
433{
434 let mut pieces_to_download = piece_indices
440 .into_iter()
441 .map(|piece_index| async move {
442 let mut kademlia = KademliaWrapper::new(node.id());
443 let key = piece_index.to_multihash();
444
445 let local_closest_peers = node
446 .get_closest_local_peers(key, None)
447 .await
448 .unwrap_or_default();
449
450 for (peer_id, addresses) in local_closest_peers {
452 kademlia.add_peer(&peer_id, addresses);
453 }
454
455 (piece_index, kademlia)
456 })
457 .collect::<FuturesUnordered<_>>()
458 .collect::<HashMap<_, _>>()
459 .await;
460
461 let num_pieces = pieces_to_download.len();
462 debug!(%num_pieces, "Starting");
463
464 let mut checked_peers = HashSet::new();
465
466 let Ok(connected_servers) = node.connected_servers().await else {
467 trace!("Connected servers error");
468 return pieces_to_download.into_keys();
469 };
470
471 let num_connected_servers = connected_servers.len();
472 debug!(
473 %num_connected_servers,
474 %num_pieces,
475 "Starting downloading"
476 );
477
478 let mut downloading_stream = connected_servers
480 .into_iter()
481 .take(num_pieces)
482 .enumerate()
483 .map(|(peer_index, peer_id)| {
484 checked_peers.insert(peer_id);
485
486 let step = num_pieces / num_connected_servers.min(num_pieces);
488
489 let mut check_cached_pieces = pieces_to_download
492 .keys()
493 .cycle()
494 .skip(step * peer_index)
495 .take(num_pieces.min(CachedPieceByIndexRequest::RECOMMENDED_LIMIT + 1))
497 .copied()
498 .collect::<Vec<_>>();
499 let piece_index = check_cached_pieces.swap_remove(0);
501
502 trace!(%peer_id, %piece_index, "Downloading piece from initially connected peer");
503
504 let permit = semaphore.try_acquire();
505
506 let fut = async move {
507 let permit = match permit {
508 Some(permit) => permit,
509 None => semaphore.acquire().await,
510 };
511
512 download_cached_piece_from_peer(
513 node,
514 piece_validator,
515 peer_id,
516 Vec::new(),
517 Arc::new(check_cached_pieces),
518 piece_index,
519 HashSet::new(),
520 HashSet::new(),
521 permit,
522 )
523 .await
524 };
525
526 (piece_index, Box::pin(fut.into_stream()) as _)
527 })
528 .collect::<StreamMap<_, _>>();
529
530 loop {
531 let mut additional_pieces_to_download =
533 (num_pieces / 2).saturating_sub(downloading_stream.len());
534 if additional_pieces_to_download > 0 {
535 trace!(
536 %additional_pieces_to_download,
537 num_pieces,
538 currently_downloading = %downloading_stream.len(),
539 "Downloading additional pieces from closest peers"
540 );
541 'outer: for peer_id in node
543 .connected_servers()
544 .await
545 .unwrap_or_default()
546 .into_iter()
547 .filter(|peer_id| checked_peers.insert(*peer_id))
548 .take(additional_pieces_to_download)
549 {
550 let permit = if downloading_stream.is_empty() {
551 semaphore.acquire().await
552 } else if let Some(permit) = semaphore.try_acquire() {
553 permit
554 } else {
555 break;
556 };
557
558 for &piece_index in pieces_to_download.keys() {
559 if downloading_stream.contains_key(&piece_index) {
560 continue;
561 }
562
563 trace!(%peer_id, %piece_index, "Downloading piece from newly connected peer");
564
565 let check_cached_pieces = sample_cached_piece_indices(
566 pieces_to_download.keys(),
567 &HashSet::new(),
568 &HashSet::new(),
569 piece_index,
570 );
571 let fut = download_cached_piece_from_peer(
572 node,
573 piece_validator,
574 peer_id,
575 Vec::new(),
576 Arc::new(check_cached_pieces),
577 piece_index,
578 HashSet::new(),
579 HashSet::new(),
580 permit,
581 );
582
583 downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
584 additional_pieces_to_download -= 1;
585
586 continue 'outer;
587 }
588
589 break;
590 }
591
592 let pieces_indices_to_download = pieces_to_download.keys().copied().collect::<Vec<_>>();
596 for piece_index in pieces_indices_to_download {
597 if additional_pieces_to_download == 0 {
598 break;
599 }
600 if downloading_stream.contains_key(&piece_index) {
601 continue;
602 }
603 let permit = if downloading_stream.is_empty() {
604 semaphore.acquire().await
605 } else if let Some(permit) = semaphore.try_acquire() {
606 permit
607 } else {
608 break;
609 };
610
611 let kbucket_key = KBucketKey::from(piece_index.to_multihash());
612 let closest_peers_to_check = pieces_to_download
613 .get_mut(&piece_index)
614 .expect("Entries are not removed here; qed")
615 .closest_peers(&kbucket_key);
616 for (peer_id, addresses) in closest_peers_to_check {
617 if !checked_peers.insert(peer_id) {
618 continue;
619 }
620
621 trace!(%peer_id, %piece_index, "Downloading piece from closest peer");
622
623 let check_cached_pieces = sample_cached_piece_indices(
624 pieces_to_download.keys(),
625 &HashSet::new(),
626 &HashSet::new(),
627 piece_index,
628 );
629 let fut = download_cached_piece_from_peer(
630 node,
631 piece_validator,
632 peer_id,
633 addresses,
634 Arc::new(check_cached_pieces),
635 piece_index,
636 HashSet::new(),
637 HashSet::new(),
638 permit,
639 );
640
641 downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
642 additional_pieces_to_download -= 1;
643 break;
644 }
645 }
646
647 trace!(
648 pieces_left = %additional_pieces_to_download,
649 "Initiated downloading additional pieces from closest peers"
650 );
651 }
652
653 let Some((piece_index, result)) = downloading_stream.next().await else {
654 if !pieces_to_download.is_empty() {
655 debug!(
656 %num_pieces,
657 to_download = %pieces_to_download.len(),
658 "Finished downloading early"
659 );
660 break;
662 }
663 break;
664 };
665 process_downloading_result(
666 piece_index,
667 result,
668 &mut pieces_to_download,
669 &mut downloading_stream,
670 node,
671 piece_validator,
672 results,
673 );
674
675 if pieces_to_download.is_empty() {
676 break;
677 }
678 }
679
680 pieces_to_download.into_keys()
681}
682
683fn process_downloading_result<'a, 'b, PV>(
684 piece_index: PieceIndex,
685 result: DownloadedPieceFromPeer<'a>,
686 pieces_to_download: &'b mut HashMap<PieceIndex, KademliaWrapper>,
687 downloading_stream: &'b mut StreamMap<
688 PieceIndex,
689 Pin<Box<dyn Stream<Item = DownloadedPieceFromPeer<'a>> + Send + 'a>>,
690 >,
691 node: &'a Node,
692 piece_validator: &'a PV,
693 results: &'a mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
694) where
695 PV: PieceValidator,
696{
697 let DownloadedPieceFromPeer {
698 peer_id,
699 result,
700 mut cached_pieces,
701 not_cached_pieces,
702 permit,
703 } = result;
704 trace!(%piece_index, %peer_id, result = %result.is_some(), "Piece response");
705
706 let Some(result) = result else {
707 return;
709 };
710
711 match result {
712 PieceResult::Piece(piece) => {
713 trace!(%piece_index, %peer_id, "Got piece");
714
715 pieces_to_download.remove(&piece_index);
717
718 results
719 .unbounded_send((piece_index, Some(piece)))
720 .expect("This future isn't polled after receiver is dropped; qed");
721
722 if pieces_to_download.is_empty() {
723 return;
724 }
725
726 cached_pieces.remove(&piece_index);
727 }
728 PieceResult::ClosestPeers(closest_peers) => {
729 trace!(%piece_index, %peer_id, "Got closest peers");
730
731 if let Some(kademlia) = pieces_to_download.get_mut(&piece_index) {
733 for (peer_id, addresses) in Vec::from(closest_peers) {
734 kademlia.add_peer(&peer_id, addresses);
735 }
736 }
737
738 if cached_pieces.remove(&piece_index) {
740 return;
741 }
742 }
743 }
744
745 let mut maybe_piece_index_to_download_next = None;
746 cached_pieces.retain(|piece_index| {
748 if !pieces_to_download.contains_key(piece_index) {
750 return false;
751 }
752
753 if maybe_piece_index_to_download_next.is_none()
755 && !downloading_stream.contains_key(piece_index)
756 {
757 maybe_piece_index_to_download_next.replace(*piece_index);
758 return true;
760 }
761
762 true
764 });
765
766 let piece_index_to_download_next = if let Some(piece_index) = maybe_piece_index_to_download_next
767 {
768 trace!(%piece_index, %peer_id, "Next piece to download from peer");
769 piece_index
770 } else {
771 trace!(%peer_id, "Peer doesn't have anything else");
772 return;
774 };
775
776 let fut = download_cached_piece_from_peer(
777 node,
778 piece_validator,
779 peer_id,
780 Vec::new(),
781 Arc::new(sample_cached_piece_indices(
784 pieces_to_download.keys(),
785 &cached_pieces,
786 ¬_cached_pieces,
787 piece_index_to_download_next,
788 )),
789 piece_index_to_download_next,
790 cached_pieces,
791 not_cached_pieces,
792 permit,
793 );
794 downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream()));
795}
796
797fn sample_cached_piece_indices<'a, I>(
798 pieces_to_download: I,
799 cached_pieces: &HashSet<PieceIndex>,
800 not_cached_pieces: &HashSet<PieceIndex>,
801 piece_index_to_download_next: PieceIndex,
802) -> Vec<PieceIndex>
803where
804 I: Iterator<Item = &'a PieceIndex>,
805{
806 pieces_to_download
807 .filter_map(|piece_index| {
810 if piece_index == &piece_index_to_download_next
811 || cached_pieces.contains(piece_index)
812 || not_cached_pieces.contains(piece_index)
813 {
814 None
815 } else {
816 Some(*piece_index)
817 }
818 })
819 .sample(
820 &mut rand::rng(),
821 CachedPieceByIndexRequest::RECOMMENDED_LIMIT,
822 )
823}
824
825struct DownloadedPieceFromPeer<'a> {
826 peer_id: PeerId,
827 result: Option<PieceResult>,
828 cached_pieces: HashSet<PieceIndex>,
829 not_cached_pieces: HashSet<PieceIndex>,
830 permit: SemaphoreGuard<'a>,
831}
832
833#[expect(clippy::too_many_arguments)]
837async fn download_cached_piece_from_peer<'a, PV>(
838 node: &'a Node,
839 piece_validator: &'a PV,
840 peer_id: PeerId,
841 addresses: Vec<Multiaddr>,
842 check_cached_pieces: Arc<Vec<PieceIndex>>,
843 piece_index: PieceIndex,
844 mut cached_pieces: HashSet<PieceIndex>,
845 mut not_cached_pieces: HashSet<PieceIndex>,
846 permit: SemaphoreGuard<'a>,
847) -> DownloadedPieceFromPeer<'a>
848where
849 PV: PieceValidator,
850{
851 let result = match node
852 .send_generic_request(
853 peer_id,
854 addresses,
855 CachedPieceByIndexRequest {
856 piece_index,
857 cached_pieces: Arc::clone(&check_cached_pieces),
858 },
859 )
860 .await
861 {
862 Ok(response) => {
863 let CachedPieceByIndexResponse {
864 result,
865 cached_pieces,
866 } = response;
867
868 match result {
869 PieceResult::Piece(piece) => piece_validator
870 .validate_piece(peer_id, piece_index, piece)
871 .await
872 .map(|piece| CachedPieceByIndexResponse {
873 result: PieceResult::Piece(piece),
874 cached_pieces,
875 }),
876 PieceResult::ClosestPeers(closest_peers) => Some(CachedPieceByIndexResponse {
877 result: PieceResult::ClosestPeers(closest_peers),
878 cached_pieces,
879 }),
880 }
881 }
882 Err(error) => {
883 debug!(%error, %peer_id, %piece_index, "Failed to download cached piece from peer");
884
885 None
886 }
887 };
888
889 match result {
890 Some(result) => {
891 cached_pieces.extend(result.cached_pieces);
892 not_cached_pieces.extend(
893 check_cached_pieces
894 .iter()
895 .filter(|piece_index| !cached_pieces.contains(piece_index))
896 .copied(),
897 );
898
899 DownloadedPieceFromPeer {
900 peer_id,
901 result: Some(result.result),
902 cached_pieces: { cached_pieces },
903 not_cached_pieces,
904 permit,
905 }
906 }
907 None => DownloadedPieceFromPeer {
908 peer_id,
909 result: None,
910 cached_pieces,
911 not_cached_pieces,
912 permit,
913 },
914 }
915}