Skip to main content

ab_farmer/farmer_piece_getter/
piece_validator.rs

1//! Farmer-specific validator for pieces retrieved from the network
2
3use crate::node_client::NodeClient;
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_networking::Node;
6use ab_networking::libp2p::PeerId;
7use ab_networking::utils::piece_provider::PieceValidator;
8use async_trait::async_trait;
9use tracing::{error, warn};
10
11/// Farmer-specific validator for pieces retrieved from the network.
12///
13/// Implements [`PieceValidator`].
14#[derive(Debug, Clone)]
15pub struct SegmentRootPieceValidator<NC> {
16    dsn_node: Node,
17    node_client: NC,
18}
19
20impl<NC> SegmentRootPieceValidator<NC> {
21    /// Create a new instance
22    pub fn new(dsn_node: Node, node_client: NC) -> Self {
23        Self {
24            dsn_node,
25            node_client,
26        }
27    }
28}
29
30#[async_trait]
31impl<NC> PieceValidator for SegmentRootPieceValidator<NC>
32where
33    NC: NodeClient,
34{
35    async fn validate_piece(
36        &self,
37        source_peer_id: PeerId,
38        piece_index: PieceIndex,
39        piece: Piece,
40    ) -> Option<Piece> {
41        if source_peer_id == self.dsn_node.id() {
42            return Some(piece);
43        }
44
45        let segment_index = piece_index.segment_index();
46
47        let super_segment_headers = match self
48            .node_client
49            .super_segment_headers(vec![piece.header.super_segment_index.as_inner()])
50            .await
51        {
52            Ok(super_segment_headers) => super_segment_headers,
53            Err(error) => {
54                error!(
55                    %piece_index,
56                    ?error,
57                    "Failed to retrieve super segment headers from node"
58                );
59                return None;
60            }
61        };
62
63        let Some(super_segment_header) = super_segment_headers.into_iter().next().flatten() else {
64            error!(
65                %piece_index,
66                %segment_index,
67                "Super segment root for super segment index wasn't found on node"
68            );
69            return None;
70        };
71
72        let is_valid_fut = tokio::task::spawn_blocking(move || {
73            piece
74                .is_valid(
75                    &super_segment_header.root,
76                    super_segment_header.num_segments,
77                    piece_index.position(),
78                )
79                .then_some(piece)
80        });
81
82        match is_valid_fut.await.unwrap_or_default() {
83            Some(piece) => Some(piece),
84            None => {
85                warn!(
86                    %piece_index,
87                    %source_peer_id,
88                    "Received invalid piece from peer"
89                );
90
91                // We don't care about result here
92                let _ = self.dsn_node.ban_peer(source_peer_id).await;
93                None
94            }
95        }
96    }
97}