Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2824,6 +2824,28 @@ impl ReplayStage {
purge_repair_slot_counter,
SlotStateUpdate::BankFrozen(bank_frozen_state),
);
// If we are replaying from ledger and previously marked this slot as duplicate, let the state machine know
if blockstore.get_duplicate_slot(bank.slot()).is_some() {
let duplicate_state = DuplicateState::new_from_state(
bank.slot(),
gossip_duplicate_confirmed_slots,
heaviest_subtree_fork_choice,
|| false,
|| Some(bank.hash()),
);
check_slot_agrees_with_cluster(
bank.slot(),
bank_forks.read().unwrap().root(),
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state),
);
}
if let Some(sender) = bank_notification_sender {
sender
.sender
Expand Down
3 changes: 2 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Tvu {
leader_schedule_cache.clone(),
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,
duplicate_slots_sender.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
Expand Down Expand Up @@ -322,6 +322,7 @@ impl Tvu {
blockstore,
leader_schedule_cache.clone(),
bank_forks.clone(),
duplicate_slots_sender,
),
);

Expand Down
60 changes: 40 additions & 20 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
rayon::{prelude::*, ThreadPool},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics},
blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
leader_schedule_cache::LeaderScheduleCache,
shred::{self, Nonce, ReedSolomonCache, Shred},
},
Expand Down Expand Up @@ -138,23 +138,41 @@ impl WindowServiceMetrics {
fn run_check_duplicate(
cluster_info: &ClusterInfo,
blockstore: &Blockstore,
shred_receiver: &Receiver<Shred>,
shred_receiver: &Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: &DuplicateSlotSender,
) -> Result<()> {
let check_duplicate = |shred: Shred| -> Result<()> {
let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
let shred_slot = shred.slot();
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) {
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload,
shred.into_payload(),
)?;

duplicate_slots_sender.send(shred_slot)?;
let (shred1, shred2) = match shred {
PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::Exists(shred) => {
// Unlike the other cases we have to wait until here to decide to handle the duplicate and store
// in blockstore. This is because the duplicate could have been part of the same insert batch,
// so we wait until the batch has been written.
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) {
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload.clone(),
shred.clone().into_payload(),
)?;
(shred, existing_shred_payload)
} else {
// Shred is not duplicate
return Ok(());
}
} else {
// Shred has already been handled
return Ok(());
}
}
}
};

// Propagate duplicate proof through gossip
cluster_info.push_duplicate_shred(&shred1, &shred2)?;
// Notify duplicate consensus state machine
duplicate_slots_sender.send(shred_slot)?;

Ok(())
};
Expand Down Expand Up @@ -226,7 +244,7 @@ fn run_insert<F>(
reed_solomon_cache: &ReedSolomonCache,
) -> Result<()>
where
F: Fn(Shred),
F: Fn(PossibleDuplicateShred),
{
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
Expand Down Expand Up @@ -370,7 +388,7 @@ impl WindowService {
cluster_info: Arc<ClusterInfo>,
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
duplicate_receiver: Receiver<Shred>,
duplicate_receiver: Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: DuplicateSlotSender,
) -> JoinHandle<()> {
let handle_error = || {
Expand Down Expand Up @@ -400,7 +418,7 @@ impl WindowService {
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
verified_receiver: Receiver<Vec<PacketBatch>>,
check_duplicate_sender: Sender<Shred>,
check_duplicate_sender: Sender<PossibleDuplicateShred>,
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_sender: Sender<Vec<ShredPayload>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
Expand All @@ -417,8 +435,8 @@ impl WindowService {
Builder::new()
.name("solWinInsert".to_string())
.spawn(move || {
let handle_duplicate = |shred| {
let _ = check_duplicate_sender.send(shred);
let handle_duplicate = |possible_duplicate_shred| {
let _ = check_duplicate_sender.send(possible_duplicate_shred);
};
let mut metrics = BlockstoreInsertionMetrics::default();
let mut ws_metrics = WindowServiceMetrics::default();
Expand Down Expand Up @@ -551,7 +569,9 @@ mod test {
};
assert_eq!(duplicate_shred.slot(), shreds[0].slot());
let duplicate_shred_slot = duplicate_shred.slot();
sender.send(duplicate_shred.clone()).unwrap();
sender
.send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
Expand Down
1 change: 1 addition & 0 deletions gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ thiserror = { workspace = true }
[dev-dependencies]
num_cpus = { workspace = true }
serial_test = { workspace = true }
test-case = { workspace = true }

[build-dependencies]
rustc_version = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl CrdsGossip {
let now = timestamp();
for entry in entries {
if let Err(err) = crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_duplicate_shred faild: {:?}", err);
error!("push_duplicate_shred failed: {:?}", err);
}
}
Ok(())
Expand Down
Loading