ab_farmer/farmer_piece_getter/
piece_validator.rs1use crate::node_client::NodeClient;
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_networking::Node;
6use ab_networking::libp2p::PeerId;
7use ab_networking::utils::piece_provider::PieceValidator;
8use async_trait::async_trait;
9use tracing::{error, warn};
10
11#[derive(Debug, Clone)]
15pub struct SegmentRootPieceValidator<NC> {
16 dsn_node: Node,
17 node_client: NC,
18}
19
20impl<NC> SegmentRootPieceValidator<NC> {
21 pub fn new(dsn_node: Node, node_client: NC) -> Self {
23 Self {
24 dsn_node,
25 node_client,
26 }
27 }
28}
29
30#[async_trait]
31impl<NC> PieceValidator for SegmentRootPieceValidator<NC>
32where
33 NC: NodeClient,
34{
35 async fn validate_piece(
36 &self,
37 source_peer_id: PeerId,
38 piece_index: PieceIndex,
39 piece: Piece,
40 ) -> Option<Piece> {
41 if source_peer_id == self.dsn_node.id() {
42 return Some(piece);
43 }
44
45 let segment_index = piece_index.segment_index();
46
47 let segment_headers = match self.node_client.segment_headers(vec![segment_index]).await {
48 Ok(segment_headers) => segment_headers,
49 Err(error) => {
50 error!(
51 %piece_index,
52 ?error,
53 "Failed to retrieve segment headers from node"
54 );
55 return None;
56 }
57 };
58
59 let segment_root = match segment_headers.into_iter().next().flatten() {
60 Some(segment_header) => segment_header.segment_root,
61 None => {
62 error!(
63 %piece_index,
64 %segment_index,
65 "Segment root for segment index wasn't found on node"
66 );
67 return None;
68 }
69 };
70
71 let is_valid_fut = tokio::task::spawn_blocking(move || {
72 piece
73 .is_valid(&segment_root, piece_index.position())
74 .then_some(piece)
75 });
76
77 match is_valid_fut.await.unwrap_or_default() {
78 Some(piece) => Some(piece),
79 None => {
80 warn!(
81 %piece_index,
82 %source_peer_id,
83 "Received invalid piece from peer"
84 );
85
86 let _ = self.dsn_node.ban_peer(source_peer_id).await;
88 None
89 }
90 }
91 }
92}