Skip to main content

ab_networking/utils/
piece_provider.rs

1//! Provides methods to retrieve pieces from DSN.
2
3use crate::constructor::DummyRecordStore;
4use crate::protocols::request_response::handlers::cached_piece_by_index::{
5    CachedPieceByIndexRequest, CachedPieceByIndexResponse, PieceResult,
6};
7use crate::protocols::request_response::handlers::piece_by_index::{
8    PieceByIndexRequest, PieceByIndexResponse,
9};
10use crate::utils::multihash::ToMultihash;
11use crate::{Multihash, Node};
12use ab_core_primitives::pieces::{Piece, PieceIndex};
13use async_lock::{Semaphore, SemaphoreGuard};
14use async_trait::async_trait;
15use futures::channel::mpsc;
16use futures::future::FusedFuture;
17use futures::stream::FuturesUnordered;
18use futures::task::noop_waker_ref;
19use futures::{FutureExt, Stream, StreamExt, stream};
20use libp2p::kad::{Behaviour as Kademlia, KBucketKey, RecordKey};
21use libp2p::swarm::NetworkBehaviour;
22use libp2p::{Multiaddr, PeerId};
23use rand::prelude::*;
24use std::any::type_name;
25use std::collections::{HashMap, HashSet};
26use std::fmt;
27use std::future::ready;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31use tokio_stream::StreamMap;
32use tracing::{Instrument, debug, trace, warn};
33
34/// Validates piece against using its root.
35#[async_trait]
36pub trait PieceValidator: Sync + Send {
37    /// Validates piece against using its root.
38    async fn validate_piece(
39        &self,
40        source_peer_id: PeerId,
41        piece_index: PieceIndex,
42        piece: Piece,
43    ) -> Option<Piece>;
44}
45
46/// Stub implementation for piece validation.
47#[derive(Debug, Clone, Copy)]
48pub struct NoPieceValidator;
49
50#[async_trait]
51impl PieceValidator for NoPieceValidator {
52    async fn validate_piece(&self, _: PeerId, _: PieceIndex, piece: Piece) -> Option<Piece> {
53        Some(piece)
54    }
55}
56
57/// Piece provider with cancellation and piece validator.
58/// Use `NoPieceValidator` to disable validation.
59#[derive(Clone)]
60pub struct PieceProvider<PV> {
61    node: Node,
62    piece_validator: PV,
63    piece_downloading_semaphore: Arc<Semaphore>,
64}
65
66impl<PV> fmt::Debug for PieceProvider<PV> {
67    #[inline]
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        f.debug_struct(&format!("PieceProvider<{}>", type_name::<PV>()))
70            .finish_non_exhaustive()
71    }
72}
73
74impl<PV> PieceProvider<PV>
75where
76    PV: PieceValidator,
77{
78    /// Creates new piece provider.
79    pub fn new(
80        node: Node,
81        piece_validator: PV,
82        piece_downloading_semaphore: Arc<Semaphore>,
83    ) -> Self {
84        Self {
85            node,
86            piece_validator,
87            piece_downloading_semaphore,
88        }
89    }
90
91    /// Get pieces with provided indices from cache.
92    ///
93    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
94    pub fn get_from_cache<'a, PieceIndices>(
95        &'a self,
96        piece_indices: PieceIndices,
97    ) -> impl Future<Output = impl Stream<Item = (PieceIndex, Option<Piece>)> + Unpin + 'a>
98    where
99        PieceIndices: IntoIterator<Item = PieceIndex> + 'a,
100    {
101        let download_id = rand::random::<u64>();
102        let (tx, mut rx) = mpsc::unbounded();
103        let fut = async move {
104            let not_downloaded_pieces = download_cached_pieces(
105                piece_indices,
106                &self.node,
107                &self.piece_validator,
108                &tx,
109                &self.piece_downloading_semaphore,
110            )
111            .await;
112
113            if not_downloaded_pieces.is_empty() {
114                debug!("Done");
115                return;
116            }
117
118            for piece_index in not_downloaded_pieces {
119                tx.unbounded_send((piece_index, None))
120                    .expect("This future isn't polled after receiver is dropped; qed");
121            }
122
123            debug!("Done #2");
124        };
125
126        let mut fut = Box::pin(fut.instrument(tracing::info_span!("", %download_id)).fuse());
127
128        // Drive above future and stream back any pieces that were downloaded so far
129        ready(stream::poll_fn(move |cx| {
130            if !fut.is_terminated() {
131                // Result doesn't matter, we'll need to poll stream below anyway
132                let _: Poll<()> = fut.poll_unpin(cx);
133            }
134
135            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
136                return Poll::Ready(maybe_result);
137            }
138
139            // Exit will be done by the stream above
140            Poll::Pending
141        }))
142    }
143
144    /// Returns piece by its index from farmer's piece cache (L2)
145    pub async fn get_piece_from_cache(&self, piece_index: PieceIndex) -> Option<Piece> {
146        let key = RecordKey::from(piece_index.to_multihash());
147
148        let request_batch = self.node.get_requests_batch_handle().await;
149        let mut get_providers_stream = request_batch
150            .get_providers(key.clone())
151            .await
152            .inspect_err(|err| warn!(%piece_index,?key, ?err, "get_providers returned an error"))
153            .ok()?;
154
155        let key = hex::encode(&key);
156        while let Some(provider_id) = get_providers_stream.next().await {
157            trace!(%piece_index, key, %provider_id, "get_providers returned an item");
158
159            let Ok(PieceByIndexResponse {
160                piece,
161                cached_pieces: _,
162            }) = request_batch
163                .send_generic_request(
164                    provider_id,
165                    Vec::new(),
166                    PieceByIndexRequest {
167                        piece_index,
168                        cached_pieces: Arc::default(),
169                    },
170                )
171                .await
172                .inspect_err(
173                    |error| debug!(%piece_index, key, %provider_id, ?error, "Piece request failed"),
174                )
175            else {
176                continue;
177            };
178
179            if let Some(piece) = piece {
180                trace!(%piece_index, key, %provider_id, "Piece request succeeded");
181
182                return self
183                    .piece_validator
184                    .validate_piece(provider_id, piece_index, piece)
185                    .await;
186            }
187
188            debug!(%piece_index, key, %provider_id, "Piece request returned empty piece");
189        }
190
191        None
192    }
193
194    /// Get piece from a particular peer.
195    pub async fn get_piece_from_peer(
196        &self,
197        peer_id: PeerId,
198        piece_index: PieceIndex,
199    ) -> Option<Piece> {
200        // TODO: Take advantage of `cached_pieces`
201        let PieceByIndexResponse {
202            piece,
203            cached_pieces: _,
204        } = self
205            .node
206            .send_generic_request(
207                peer_id,
208                Vec::new(),
209                PieceByIndexRequest {
210                    piece_index,
211                    cached_pieces: Arc::default(),
212                },
213            )
214            .await
215            .inspect_err(|error| debug!(%peer_id, %piece_index, ?error, "Piece request failed"))
216            .ok()?;
217
218        if let Some(piece) = piece {
219            trace!(%peer_id, %piece_index, "Piece request succeeded");
220
221            return self
222                .piece_validator
223                .validate_piece(peer_id, piece_index, piece)
224                .await;
225        }
226
227        debug!(%peer_id, %piece_index, "Piece request returned empty piece");
228
229        None
230    }
231
232    /// Get piece from archival storage (L1). The algorithm tries to get a piece from currently
233    /// connected peers and falls back to random walking.
234    pub async fn get_piece_from_archival_storage(
235        &self,
236        piece_index: PieceIndex,
237        max_random_walking_rounds: usize,
238    ) -> Option<Piece> {
239        // TODO: consider using retry policy for L1 lookups as well.
240        trace!(%piece_index, "Getting piece from archival storage..");
241
242        let connected_servers = {
243            let connected_servers = match self.node.connected_servers().await {
244                Ok(connected_servers) => connected_servers,
245                Err(err) => {
246                    debug!(%piece_index, ?err, "Cannot get connected peers (DSN L1 lookup)");
247
248                    Default::default()
249                }
250            };
251
252            HashSet::<PeerId>::from_iter(connected_servers)
253        };
254
255        if connected_servers.is_empty() {
256            debug!(%piece_index, "Cannot acquire piece from no connected peers (DSN L1 lookup)");
257        } else {
258            for peer_id in connected_servers {
259                let maybe_piece = self.get_piece_from_peer(peer_id, piece_index).await;
260
261                if maybe_piece.is_some() {
262                    trace!(%piece_index, %peer_id, "DSN L1 lookup from connected peers succeeded");
263
264                    return maybe_piece;
265                }
266            }
267        }
268
269        trace!(%piece_index, "Getting piece from DSN L1 using random walk.");
270        let random_walk_result = self
271            .get_piece_by_random_walking(piece_index, max_random_walking_rounds)
272            .await;
273
274        if random_walk_result.is_some() {
275            trace!(%piece_index, "DSN L1 lookup via random walk succeeded");
276
277            return random_walk_result;
278        }
279
280        debug!(
281            %piece_index,
282            %max_random_walking_rounds,
283            "Cannot acquire piece from DSN L1: random walk failed"
284        );
285
286        None
287    }
288
289    /// Get piece from L1 by random walking
290    async fn get_piece_by_random_walking(
291        &self,
292        piece_index: PieceIndex,
293        walking_rounds: usize,
294    ) -> Option<Piece> {
295        for round in 0..walking_rounds {
296            debug!(%piece_index, round, "Random walk round");
297
298            let result = self
299                .get_piece_by_random_walking_from_single_round(piece_index, round)
300                .await;
301
302            if result.is_some() {
303                return result;
304            }
305        }
306
307        debug!(%piece_index, "Random walking piece retrieval failed.");
308
309        None
310    }
311
312    /// Get piece from L1 by random walking (single round)
313    async fn get_piece_by_random_walking_from_single_round(
314        &self,
315        piece_index: PieceIndex,
316        round: usize,
317    ) -> Option<Piece> {
318        // TODO: Take advantage of `cached_pieces`
319        trace!(%piece_index, "get_piece_by_random_walking round");
320
321        // Random walk key
322        let key = PeerId::random();
323
324        let request_batch = self.node.get_requests_batch_handle().await;
325        let mut get_closest_peers_stream = request_batch
326            .get_closest_peers(key.into())
327            .await
328            .inspect_err(|err| warn!(%piece_index, ?key, ?err, %round, "get_closest_peers returned an error"))
329            .ok()?;
330
331        while let Some(peer_id) = get_closest_peers_stream.next().await {
332            trace!(%piece_index, %peer_id, %round, "get_closest_peers returned an item");
333
334            let Ok(PieceByIndexResponse {
335                piece,
336                cached_pieces: _,
337            }) = request_batch
338                .send_generic_request(
339                    peer_id,
340                    Vec::new(),
341                    PieceByIndexRequest {
342                        piece_index,
343                        cached_pieces: Arc::default(),
344                    },
345                )
346                .await
347                .inspect_err(
348                    |error| debug!(%peer_id, %piece_index, ?key, %round, ?error, "Piece request failed."),
349                )
350            else {
351                continue;
352            };
353
354            if let Some(piece) = piece {
355                trace!(%peer_id, %piece_index, ?key, %round,  "Piece request succeeded.");
356
357                return self
358                    .piece_validator
359                    .validate_piece(peer_id, piece_index, piece)
360                    .await;
361            }
362
363            debug!(%peer_id, %piece_index, ?key, %round, "Piece request returned empty piece.");
364        }
365
366        None
367    }
368}
369
370/// Kademlia wrapper to take advantage of its internal logic of selecting closest peers
371struct KademliaWrapper {
372    local_peer_id: PeerId,
373    kademlia: Kademlia<DummyRecordStore>,
374}
375
376impl KademliaWrapper {
377    fn new(local_peer_id: PeerId) -> Self {
378        Self {
379            local_peer_id,
380            kademlia: Kademlia::new(local_peer_id, DummyRecordStore),
381        }
382    }
383
384    fn add_peer(&mut self, peer_id: &PeerId, addresses: Vec<Multiaddr>) {
385        for address in addresses {
386            self.kademlia.add_address(peer_id, address);
387        }
388        while self
389            .kademlia
390            .poll(&mut Context::from_waker(noop_waker_ref()))
391            .is_ready()
392        {
393            // Simply drain useless events generated by above calls
394        }
395    }
396
397    /// Returned peers are already sorted in ascending distance order
398    fn closest_peers(
399        &mut self,
400        key: &KBucketKey<Multihash>,
401    ) -> impl Iterator<Item = (PeerId, Vec<Multiaddr>)> + 'static {
402        let mut closest_peers = self
403            .kademlia
404            .find_closest_local_peers(key, &self.local_peer_id)
405            .map(|peer| {
406                (
407                    KBucketKey::from(peer.node_id).distance(key),
408                    peer.node_id,
409                    peer.multiaddrs,
410                )
411            })
412            .collect::<Vec<_>>();
413
414        closest_peers.sort_unstable_by_key(|a| a.0);
415        closest_peers
416            .into_iter()
417            .map(|(_distance, peer_id, addresses)| (peer_id, addresses))
418    }
419}
420
421/// Takes pieces to download as an input, sends results with pieces that were downloaded
422/// successfully and returns those that were not downloaded
423async fn download_cached_pieces<PV, PieceIndices>(
424    piece_indices: PieceIndices,
425    node: &Node,
426    piece_validator: &PV,
427    results: &mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
428    semaphore: &Semaphore,
429) -> impl ExactSizeIterator<Item = PieceIndex>
430where
431    PV: PieceValidator,
432    PieceIndices: IntoIterator<Item = PieceIndex>,
433{
434    // Make sure every piece index has an entry since this will be the primary container for
435    // tracking pieces to download going forward.
436    //
437    // At the end pieces that were not downloaded will remain with a collection of known closest
438    // peers for them.
439    let mut pieces_to_download = piece_indices
440        .into_iter()
441        .map(|piece_index| async move {
442            let mut kademlia = KademliaWrapper::new(node.id());
443            let key = piece_index.to_multihash();
444
445            let local_closest_peers = node
446                .get_closest_local_peers(key, None)
447                .await
448                .unwrap_or_default();
449
450            // Seed with local closest peers
451            for (peer_id, addresses) in local_closest_peers {
452                kademlia.add_peer(&peer_id, addresses);
453            }
454
455            (piece_index, kademlia)
456        })
457        .collect::<FuturesUnordered<_>>()
458        .collect::<HashMap<_, _>>()
459        .await;
460
461    let num_pieces = pieces_to_download.len();
462    debug!(%num_pieces, "Starting");
463
464    let mut checked_peers = HashSet::new();
465
466    let Ok(connected_servers) = node.connected_servers().await else {
467        trace!("Connected servers error");
468        return pieces_to_download.into_keys();
469    };
470
471    let num_connected_servers = connected_servers.len();
472    debug!(
473        %num_connected_servers,
474        %num_pieces,
475        "Starting downloading"
476    );
477
478    // Dispatch initial set of requests to peers with checked pieces distributed uniformly
479    let mut downloading_stream = connected_servers
480        .into_iter()
481        .take(num_pieces)
482        .enumerate()
483        .map(|(peer_index, peer_id)| {
484            checked_peers.insert(peer_id);
485
486            // Inside to avoid division by zero in case there are no connected servers or pieces
487            let step = num_pieces / num_connected_servers.min(num_pieces);
488
489            // Take unique first piece index for each connected peer and the rest just to check
490            // cached pieces up to recommended limit
491            let mut check_cached_pieces = pieces_to_download
492                .keys()
493                .cycle()
494                .skip(step * peer_index)
495                // + 1 because one index below is removed below
496                .take(num_pieces.min(CachedPieceByIndexRequest::RECOMMENDED_LIMIT + 1))
497                .copied()
498                .collect::<Vec<_>>();
499            // Pick first piece index as the piece we want to download
500            let piece_index = check_cached_pieces.swap_remove(0);
501
502            trace!(%peer_id, %piece_index, "Downloading piece from initially connected peer");
503
504            let permit = semaphore.try_acquire();
505
506            let fut = async move {
507                let permit = match permit {
508                    Some(permit) => permit,
509                    None => semaphore.acquire().await,
510                };
511
512                download_cached_piece_from_peer(
513                    node,
514                    piece_validator,
515                    peer_id,
516                    Vec::new(),
517                    Arc::new(check_cached_pieces),
518                    piece_index,
519                    HashSet::new(),
520                    HashSet::new(),
521                    permit,
522                )
523                .await
524            };
525
526            (piece_index, Box::pin(fut.into_stream()) as _)
527        })
528        .collect::<StreamMap<_, _>>();
529
530    loop {
531        // Process up to 50% of the pieces concurrently
532        let mut additional_pieces_to_download =
533            (num_pieces / 2).saturating_sub(downloading_stream.len());
534        if additional_pieces_to_download > 0 {
535            trace!(
536                %additional_pieces_to_download,
537                num_pieces,
538                currently_downloading = %downloading_stream.len(),
539                "Downloading additional pieces from closest peers"
540            );
541            // Pick up any newly connected peers (if any)
542            'outer: for peer_id in node
543                .connected_servers()
544                .await
545                .unwrap_or_default()
546                .into_iter()
547                .filter(|peer_id| checked_peers.insert(*peer_id))
548                .take(additional_pieces_to_download)
549            {
550                let permit = if downloading_stream.is_empty() {
551                    semaphore.acquire().await
552                } else if let Some(permit) = semaphore.try_acquire() {
553                    permit
554                } else {
555                    break;
556                };
557
558                for &piece_index in pieces_to_download.keys() {
559                    if downloading_stream.contains_key(&piece_index) {
560                        continue;
561                    }
562
563                    trace!(%peer_id, %piece_index, "Downloading piece from newly connected peer");
564
565                    let check_cached_pieces = sample_cached_piece_indices(
566                        pieces_to_download.keys(),
567                        &HashSet::new(),
568                        &HashSet::new(),
569                        piece_index,
570                    );
571                    let fut = download_cached_piece_from_peer(
572                        node,
573                        piece_validator,
574                        peer_id,
575                        Vec::new(),
576                        Arc::new(check_cached_pieces),
577                        piece_index,
578                        HashSet::new(),
579                        HashSet::new(),
580                        permit,
581                    );
582
583                    downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
584                    additional_pieces_to_download -= 1;
585
586                    continue 'outer;
587                }
588
589                break;
590            }
591
592            // Pick up more pieces to download from the closest peers
593            // Ideally we'd not allocate here, but it is hard to explain to the compiler that
594            // entries are not removed otherwise
595            let pieces_indices_to_download = pieces_to_download.keys().copied().collect::<Vec<_>>();
596            for piece_index in pieces_indices_to_download {
597                if additional_pieces_to_download == 0 {
598                    break;
599                }
600                if downloading_stream.contains_key(&piece_index) {
601                    continue;
602                }
603                let permit = if downloading_stream.is_empty() {
604                    semaphore.acquire().await
605                } else if let Some(permit) = semaphore.try_acquire() {
606                    permit
607                } else {
608                    break;
609                };
610
611                let kbucket_key = KBucketKey::from(piece_index.to_multihash());
612                let closest_peers_to_check = pieces_to_download
613                    .get_mut(&piece_index)
614                    .expect("Entries are not removed here; qed")
615                    .closest_peers(&kbucket_key);
616                for (peer_id, addresses) in closest_peers_to_check {
617                    if !checked_peers.insert(peer_id) {
618                        continue;
619                    }
620
621                    trace!(%peer_id, %piece_index, "Downloading piece from closest peer");
622
623                    let check_cached_pieces = sample_cached_piece_indices(
624                        pieces_to_download.keys(),
625                        &HashSet::new(),
626                        &HashSet::new(),
627                        piece_index,
628                    );
629                    let fut = download_cached_piece_from_peer(
630                        node,
631                        piece_validator,
632                        peer_id,
633                        addresses,
634                        Arc::new(check_cached_pieces),
635                        piece_index,
636                        HashSet::new(),
637                        HashSet::new(),
638                        permit,
639                    );
640
641                    downloading_stream.insert(piece_index, Box::pin(fut.into_stream()) as _);
642                    additional_pieces_to_download -= 1;
643                    break;
644                }
645            }
646
647            trace!(
648                pieces_left = %additional_pieces_to_download,
649                "Initiated downloading additional pieces from closest peers"
650            );
651        }
652
653        let Some((piece_index, result)) = downloading_stream.next().await else {
654            if !pieces_to_download.is_empty() {
655                debug!(
656                    %num_pieces,
657                    to_download = %pieces_to_download.len(),
658                    "Finished downloading early"
659                );
660                // Nothing was downloaded, we're done here
661                break;
662            }
663            break;
664        };
665        process_downloading_result(
666            piece_index,
667            result,
668            &mut pieces_to_download,
669            &mut downloading_stream,
670            node,
671            piece_validator,
672            results,
673        );
674
675        if pieces_to_download.is_empty() {
676            break;
677        }
678    }
679
680    pieces_to_download.into_keys()
681}
682
683fn process_downloading_result<'a, 'b, PV>(
684    piece_index: PieceIndex,
685    result: DownloadedPieceFromPeer<'a>,
686    pieces_to_download: &'b mut HashMap<PieceIndex, KademliaWrapper>,
687    downloading_stream: &'b mut StreamMap<
688        PieceIndex,
689        Pin<Box<dyn Stream<Item = DownloadedPieceFromPeer<'a>> + Send + 'a>>,
690    >,
691    node: &'a Node,
692    piece_validator: &'a PV,
693    results: &'a mpsc::UnboundedSender<(PieceIndex, Option<Piece>)>,
694) where
695    PV: PieceValidator,
696{
697    let DownloadedPieceFromPeer {
698        peer_id,
699        result,
700        mut cached_pieces,
701        not_cached_pieces,
702        permit,
703    } = result;
704    trace!(%piece_index, %peer_id, result = %result.is_some(), "Piece response");
705
706    let Some(result) = result else {
707        // Downloading failed, ignore peer
708        return;
709    };
710
711    match result {
712        PieceResult::Piece(piece) => {
713            trace!(%piece_index, %peer_id, "Got piece");
714
715            // Downloaded successfully
716            pieces_to_download.remove(&piece_index);
717
718            results
719                .unbounded_send((piece_index, Some(piece)))
720                .expect("This future isn't polled after receiver is dropped; qed");
721
722            if pieces_to_download.is_empty() {
723                return;
724            }
725
726            cached_pieces.remove(&piece_index);
727        }
728        PieceResult::ClosestPeers(closest_peers) => {
729            trace!(%piece_index, %peer_id, "Got closest peers");
730
731            // Store closer peers in case piece index was not downloaded yet
732            if let Some(kademlia) = pieces_to_download.get_mut(&piece_index) {
733                for (peer_id, addresses) in Vec::from(closest_peers) {
734                    kademlia.add_peer(&peer_id, addresses);
735                }
736            }
737
738            // No need to ask this peer again if they claimed to have this piece index earlier
739            if cached_pieces.remove(&piece_index) {
740                return;
741            }
742        }
743    }
744
745    let mut maybe_piece_index_to_download_next = None;
746    // Clear useless entries in cached pieces and find something to download next
747    cached_pieces.retain(|piece_index| {
748        // Clear downloaded pieces
749        if !pieces_to_download.contains_key(piece_index) {
750            return false;
751        }
752
753        // Try to pick a piece to download that is not being downloaded already
754        if maybe_piece_index_to_download_next.is_none()
755            && !downloading_stream.contains_key(piece_index)
756        {
757            maybe_piece_index_to_download_next.replace(*piece_index);
758            // We'll check it later when receiving response
759            return true;
760        }
761
762        // Retain everything else
763        true
764    });
765
766    let piece_index_to_download_next = if let Some(piece_index) = maybe_piece_index_to_download_next
767    {
768        trace!(%piece_index, %peer_id, "Next piece to download from peer");
769        piece_index
770    } else {
771        trace!(%peer_id, "Peer doesn't have anything else");
772        // Nothing left to do with this peer
773        return;
774    };
775
776    let fut = download_cached_piece_from_peer(
777        node,
778        piece_validator,
779        peer_id,
780        Vec::new(),
781        // Sample more random cached piece indices for connected peer, algorithm can be
782        // improved, but has to be something simple and this should do it for now
783        Arc::new(sample_cached_piece_indices(
784            pieces_to_download.keys(),
785            &cached_pieces,
786            &not_cached_pieces,
787            piece_index_to_download_next,
788        )),
789        piece_index_to_download_next,
790        cached_pieces,
791        not_cached_pieces,
792        permit,
793    );
794    downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream()));
795}
796
797fn sample_cached_piece_indices<'a, I>(
798    pieces_to_download: I,
799    cached_pieces: &HashSet<PieceIndex>,
800    not_cached_pieces: &HashSet<PieceIndex>,
801    piece_index_to_download_next: PieceIndex,
802) -> Vec<PieceIndex>
803where
804    I: Iterator<Item = &'a PieceIndex>,
805{
806    pieces_to_download
807        // Do a bit of work to filter-out piece indices we already know remote peer
808        // has or doesn't to decrease burden on them
809        .filter_map(|piece_index| {
810            if piece_index == &piece_index_to_download_next
811                || cached_pieces.contains(piece_index)
812                || not_cached_pieces.contains(piece_index)
813            {
814                None
815            } else {
816                Some(*piece_index)
817            }
818        })
819        .sample(
820            &mut rand::rng(),
821            CachedPieceByIndexRequest::RECOMMENDED_LIMIT,
822        )
823}
824
825struct DownloadedPieceFromPeer<'a> {
826    peer_id: PeerId,
827    result: Option<PieceResult>,
828    cached_pieces: HashSet<PieceIndex>,
829    not_cached_pieces: HashSet<PieceIndex>,
830    permit: SemaphoreGuard<'a>,
831}
832
833/// `check_cached_pieces` contains a list of pieces for peer to filter-out according to locally
834/// caches pieces, `cached_pieces` and `not_cached_pieces` contain piece indices peer claims is
835/// known to have or not have already
836#[expect(clippy::too_many_arguments)]
837async fn download_cached_piece_from_peer<'a, PV>(
838    node: &'a Node,
839    piece_validator: &'a PV,
840    peer_id: PeerId,
841    addresses: Vec<Multiaddr>,
842    check_cached_pieces: Arc<Vec<PieceIndex>>,
843    piece_index: PieceIndex,
844    mut cached_pieces: HashSet<PieceIndex>,
845    mut not_cached_pieces: HashSet<PieceIndex>,
846    permit: SemaphoreGuard<'a>,
847) -> DownloadedPieceFromPeer<'a>
848where
849    PV: PieceValidator,
850{
851    let result = match node
852        .send_generic_request(
853            peer_id,
854            addresses,
855            CachedPieceByIndexRequest {
856                piece_index,
857                cached_pieces: Arc::clone(&check_cached_pieces),
858            },
859        )
860        .await
861    {
862        Ok(response) => {
863            let CachedPieceByIndexResponse {
864                result,
865                cached_pieces,
866            } = response;
867
868            match result {
869                PieceResult::Piece(piece) => piece_validator
870                    .validate_piece(peer_id, piece_index, piece)
871                    .await
872                    .map(|piece| CachedPieceByIndexResponse {
873                        result: PieceResult::Piece(piece),
874                        cached_pieces,
875                    }),
876                PieceResult::ClosestPeers(closest_peers) => Some(CachedPieceByIndexResponse {
877                    result: PieceResult::ClosestPeers(closest_peers),
878                    cached_pieces,
879                }),
880            }
881        }
882        Err(error) => {
883            debug!(%error, %peer_id, %piece_index, "Failed to download cached piece from peer");
884
885            None
886        }
887    };
888
889    match result {
890        Some(result) => {
891            cached_pieces.extend(result.cached_pieces);
892            not_cached_pieces.extend(
893                check_cached_pieces
894                    .iter()
895                    .filter(|piece_index| !cached_pieces.contains(piece_index))
896                    .copied(),
897            );
898
899            DownloadedPieceFromPeer {
900                peer_id,
901                result: Some(result.result),
902                cached_pieces: { cached_pieces },
903                not_cached_pieces,
904                permit,
905            }
906        }
907        None => DownloadedPieceFromPeer {
908            peer_id,
909            result: None,
910            cached_pieces,
911            not_cached_pieces,
912            permit,
913        },
914    }
915}