ab_farmer/farmer_piece_getter/
piece_validator.rs

1//! Farmer-specific validator for pieces retrieved from the network
2
3use crate::node_client::NodeClient;
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_networking::Node;
6use ab_networking::libp2p::PeerId;
7use ab_networking::utils::piece_provider::PieceValidator;
8use async_trait::async_trait;
9use tracing::{error, warn};
10
11/// Farmer-specific validator for pieces retrieved from the network.
12///
13/// Implements [`PieceValidator`].
14#[derive(Debug, Clone)]
15pub struct SegmentRootPieceValidator<NC> {
16    dsn_node: Node,
17    node_client: NC,
18}
19
20impl<NC> SegmentRootPieceValidator<NC> {
21    /// Create a new instance
22    pub fn new(dsn_node: Node, node_client: NC) -> Self {
23        Self {
24            dsn_node,
25            node_client,
26        }
27    }
28}
29
30#[async_trait]
31impl<NC> PieceValidator for SegmentRootPieceValidator<NC>
32where
33    NC: NodeClient,
34{
35    async fn validate_piece(
36        &self,
37        source_peer_id: PeerId,
38        piece_index: PieceIndex,
39        piece: Piece,
40    ) -> Option<Piece> {
41        if source_peer_id == self.dsn_node.id() {
42            return Some(piece);
43        }
44
45        let segment_index = piece_index.segment_index();
46
47        let segment_headers = match self.node_client.segment_headers(vec![segment_index]).await {
48            Ok(segment_headers) => segment_headers,
49            Err(error) => {
50                error!(
51                    %piece_index,
52                    ?error,
53                    "Failed to retrieve segment headers from node"
54                );
55                return None;
56            }
57        };
58
59        let segment_root = match segment_headers.into_iter().next().flatten() {
60            Some(segment_header) => segment_header.segment_root,
61            None => {
62                error!(
63                    %piece_index,
64                    %segment_index,
65                    "Segment root for segment index wasn't found on node"
66                );
67                return None;
68            }
69        };
70
71        let is_valid_fut = tokio::task::spawn_blocking(move || {
72            piece
73                .is_valid(&segment_root, piece_index.position())
74                .then_some(piece)
75        });
76
77        match is_valid_fut.await.unwrap_or_default() {
78            Some(piece) => Some(piece),
79            None => {
80                warn!(
81                    %piece_index,
82                    %source_peer_id,
83                    "Received invalid piece from peer"
84                );
85
86                // We don't care about result here
87                let _ = self.dsn_node.ban_peer(source_peer_id).await;
88                None
89            }
90        }
91    }
92}