diff --git a/core/src/repair/quic_endpoint.rs b/core/src/repair/quic_endpoint.rs index c6f2e00df53a26..89f9de78491101 100644 --- a/core/src/repair/quic_endpoint.rs +++ b/core/src/repair/quic_endpoint.rs @@ -408,11 +408,16 @@ async fn handle_connection( )); match futures::future::try_join(send_requests_task, recv_requests_task).await { Err(err) => error!("handle_connection: {remote_pubkey}, {remote_address}, {err:?}"), - Ok(((), Err(err))) => { - debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); - record_error(&err, &stats); + Ok(out) => { + if let (Err(ref err), _) = out { + debug!("send_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(err, &stats); + } + if let (_, Err(ref err)) = out { + debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}"); + record_error(err, &stats); + } } - Ok(((), Ok(()))) => (), } drop_connection(remote_pubkey, &connection, &cache).await; if let Entry::Occupied(entry) = router.write().await.entry(remote_address) { @@ -513,15 +518,27 @@ async fn send_requests_task( connection: Connection, mut receiver: AsyncReceiver, stats: Arc, -) { - while let Some(request) = receiver.recv().await { - tokio::task::spawn(send_request_task( - endpoint.clone(), - remote_address, - connection.clone(), - request, - stats.clone(), - )); +) -> Result<(), Error> { + tokio::pin! { + let connection_closed = connection.closed(); + } + loop { + tokio::select! { + biased; + request = receiver.recv() => { + match request { + None => return Ok(()), + Some(request) => tokio::task::spawn(send_request_task( + endpoint.clone(), + remote_address, + connection.clone(), + request, + stats.clone(), + )), + }; + } + err = &mut connection_closed => return Err(Error::from(err)), + } } } diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 62733953cc724f..bbb130573cab9a 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -50,7 +50,7 @@ impl ShredFetchStage { .as_ref() .map(|(_, cluster_info)| cluster_info.keypair().clone()); - let (mut last_root, mut last_slot, mut slots_per_epoch) = { + let (mut last_root, mut slots_per_epoch, mut last_slot) = { let bank_forks_r = bank_forks.read().unwrap(); let root_bank = bank_forks_r.root_bank(); ( diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 5066a028336330..6e618154f20751 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -406,6 +406,35 @@ impl ErasureMeta { } } +#[allow(dead_code)] +impl MerkleRootMeta { + pub(crate) fn from_shred(shred: &Shred) -> Self { + Self { + // An error here after the shred has already sigverified + // can only indicate that the leader is sending + // legacy or malformed shreds. We should still store + // `None` for those cases in blockstore, as a later + // shred that contains a proper merkle root would constitute + // a valid duplicate shred proof. + merkle_root: shred.merkle_root().ok(), + first_received_shred_index: shred.index(), + first_received_shred_type: shred.shred_type(), + } + } + + pub(crate) fn merkle_root(&self) -> Option { + self.merkle_root + } + + pub(crate) fn first_received_shred_index(&self) -> u32 { + self.first_received_shred_index + } + + pub(crate) fn first_received_shred_type(&self) -> ShredType { + self.first_received_shred_type + } +} + impl DuplicateSlotProof { pub(crate) fn new(shred1: Vec, shred2: Vec) -> Self { DuplicateSlotProof { shred1, shred2 } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 5fda160e29b976..4cd4f8b85b918a 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -334,6 +334,7 @@ impl Shred { dispatch!(pub(crate) fn erasure_shard_index(&self) -> Result); dispatch!(pub fn into_payload(self) -> Vec); + dispatch!(pub fn merkle_root(&self) -> Result); dispatch!(pub fn payload(&self) -> &Vec); dispatch!(pub fn sanitize(&self) -> Result<(), Error>); diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index 4f1cd22111e07f..d0d507ae48aba1 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -154,7 +154,7 @@ impl ShredData { Ok(Self::SIZE_OF_HEADERS + Self::capacity(proof_size)?) } - fn merkle_root(&self) -> Result { + pub(super) fn merkle_root(&self) -> Result { let proof_size = self.proof_size()?; let index = self.erasure_shard_index()?; let proof_offset = Self::proof_offset(proof_size)?; @@ -266,7 +266,7 @@ impl ShredCode { Ok(Self::SIZE_OF_HEADERS + Self::capacity(proof_size)?) } - fn merkle_root(&self) -> Result { + pub(super) fn merkle_root(&self) -> Result { let proof_size = self.proof_size()?; let index = self.erasure_shard_index()?; let proof_offset = Self::proof_offset(proof_size)?; diff --git a/ledger/src/shred/shred_code.rs b/ledger/src/shred/shred_code.rs index ba85d92af25187..0ad97a0f729a77 100644 --- a/ledger/src/shred/shred_code.rs +++ b/ledger/src/shred/shred_code.rs @@ -6,7 +6,7 @@ use { CodingShredHeader, Error, ShredCommonHeader, ShredType, SignedData, DATA_SHREDS_PER_FEC_BLOCK, MAX_DATA_SHREDS_PER_SLOT, SIZE_OF_NONCE, }, - solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, signature::Signature}, + solana_sdk::{clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, signature::Signature}, static_assertions::const_assert_eq, }; @@ -47,6 +47,13 @@ impl ShredCode { } } + pub(super) fn merkle_root(&self) -> Result { + match self { + Self::Legacy(_) => Err(Error::InvalidShredType), + Self::Merkle(shred) => shred.merkle_root(), + } + } + pub(super) fn new_from_parity_shard( slot: Slot, index: u32, diff --git a/ledger/src/shred/shred_data.rs b/ledger/src/shred/shred_data.rs index 9bf2c0bf05f79e..ecb40367b4ef08 100644 --- a/ledger/src/shred/shred_data.rs +++ b/ledger/src/shred/shred_data.rs @@ -7,7 +7,7 @@ use { DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredType, ShredVariant, SignedData, MAX_DATA_SHREDS_PER_SLOT, }, - solana_sdk::{clock::Slot, signature::Signature}, + solana_sdk::{clock::Slot, hash::Hash, signature::Signature}, }; #[derive(Clone, Debug, Eq, PartialEq)] @@ -41,6 +41,13 @@ impl ShredData { } } + pub(super) fn merkle_root(&self) -> Result { + match self { + Self::Legacy(_) => Err(Error::InvalidShredType), + Self::Merkle(shred) => shred.merkle_root(), + } + } + pub(super) fn new_from_data( slot: Slot, index: u32, diff --git a/turbine/src/quic_endpoint.rs b/turbine/src/quic_endpoint.rs index e8a316420b42d8..a947f212296fb7 100644 --- a/turbine/src/quic_endpoint.rs +++ b/turbine/src/quic_endpoint.rs @@ -435,10 +435,21 @@ async fn send_datagram_task( connection: Connection, mut receiver: AsyncReceiver, ) -> Result<(), Error> { - while let Some(bytes) = receiver.recv().await { - connection.send_datagram(bytes)?; + tokio::pin! { + let connection_closed = connection.closed(); + } + loop { + tokio::select! { + biased; + bytes = receiver.recv() => { + match bytes { + None => return Ok(()), + Some(bytes) => connection.send_datagram(bytes)?, + } + } + err = &mut connection_closed => return Err(Error::from(err)), + } } - Ok(()) } async fn make_connection_task(