-
Notifications
You must be signed in to change notification settings - Fork 5.5k
blockstore: populate duplicate shred proofs for merkle root conflicts #34270
Changes from all commits
aabcb94
ce7ac2f
10e79c8
69db736
1210a01
2a5a575
bf6fea9
0c1f848
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -144,6 +144,7 @@ pub enum PossibleDuplicateShred { | |
| Exists(Shred), // Blockstore has another shred in its spot | ||
| LastIndexConflict(/* original */ Shred, /* conflict */ Vec<u8>), // The index of this shred conflicts with `slot_meta.last_index` | ||
| ErasureConflict(/* original */ Shred, /* conflict */ Vec<u8>), // The coding shred has a conflict in the erasure_meta | ||
| MerkleRootConflict(/* original */ Shred, /* conflict */ Vec<u8>), // Merkle root conflict in the same fec set | ||
| } | ||
|
|
||
| impl PossibleDuplicateShred { | ||
|
|
@@ -152,6 +153,7 @@ impl PossibleDuplicateShred { | |
| Self::Exists(shred) => shred.slot(), | ||
| Self::LastIndexConflict(shred, _) => shred.slot(), | ||
| Self::ErasureConflict(shred, _) => shred.slot(), | ||
| Self::MerkleRootConflict(shred, _) => shred.slot(), | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -1257,6 +1259,21 @@ impl Blockstore { | |
| metrics.num_coding_shreds_invalid += 1; | ||
| return false; | ||
| } | ||
|
|
||
| if let Some(merkle_root_meta) = merkle_root_metas.get(&erasure_set) { | ||
| // A previous shred has been inserted in this batch or in blockstore | ||
| // Compare our current shred against the previous shred for potential | ||
| // conflicts | ||
| if !self.check_merkle_root_consistency( | ||
| just_received_shreds, | ||
| slot, | ||
| merkle_root_meta.as_ref(), | ||
| &shred, | ||
| duplicate_shreds, | ||
| ) { | ||
| return false; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let erasure_meta_entry = erasure_metas.entry(erasure_set).or_insert_with(|| { | ||
|
|
@@ -1476,6 +1493,21 @@ impl Blockstore { | |
| ) { | ||
| return Err(InsertDataShredError::InvalidShred); | ||
| } | ||
|
|
||
| if let Some(merkle_root_meta) = merkle_root_metas.get(&erasure_set) { | ||
AshwinSekar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // A previous shred has been inserted in this batch or in blockstore | ||
| // Compare our current shred against the previous shred for potential | ||
| // conflicts | ||
| if !self.check_merkle_root_consistency( | ||
| just_inserted_shreds, | ||
| slot, | ||
| merkle_root_meta.as_ref(), | ||
| &shred, | ||
| duplicate_shreds, | ||
| ) { | ||
| return Err(InsertDataShredError::InvalidShred); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let newly_completed_data_sets = self.insert_data_shred( | ||
|
|
@@ -1532,22 +1564,81 @@ impl Blockstore { | |
| shred_index < slot_meta.consumed || data_index.contains(shred_index) | ||
| } | ||
|
|
||
| fn get_data_shred_from_just_inserted_or_db<'a>( | ||
| /// Finds the corresponding shred at `shred_id` in the just inserted | ||
| /// shreds or the backing store. Returns None if there is no shred. | ||
| fn get_shred_from_just_inserted_or_db<'a>( | ||
| &'a self, | ||
| just_inserted_shreds: &'a HashMap<ShredId, Shred>, | ||
| slot: Slot, | ||
| index: u64, | ||
| ) -> Cow<'a, Vec<u8>> { | ||
| let key = ShredId::new(slot, u32::try_from(index).unwrap(), ShredType::Data); | ||
| if let Some(shred) = just_inserted_shreds.get(&key) { | ||
| Cow::Borrowed(shred.payload()) | ||
| } else { | ||
| shred_id: ShredId, | ||
| ) -> Option<Cow<'a, Vec<u8>>> { | ||
| let (slot, index, shred_type) = shred_id.unpack(); | ||
| match (just_inserted_shreds.get(&shred_id), shred_type) { | ||
| (Some(shred), _) => Some(Cow::Borrowed(shred.payload())), | ||
| // If it doesn't exist in the just inserted set, it must exist in | ||
| // the backing store | ||
| Cow::Owned(self.get_data_shred(slot, index).unwrap().unwrap()) | ||
| (_, ShredType::Data) => self | ||
| .get_data_shred(slot, u64::from(index)) | ||
| .unwrap() | ||
| .map(Cow::Owned), | ||
| (_, ShredType::Code) => self | ||
| .get_coding_shred(slot, u64::from(index)) | ||
| .unwrap() | ||
| .map(Cow::Owned), | ||
| } | ||
| } | ||
|
|
||
| /// Returns true if there is no merkle root conflict between | ||
| /// the existing `merkle_root_meta` and `shred` | ||
| /// | ||
| /// Otherwise return false and if not already present, add duplicate proof to | ||
| /// `duplicate_shreds`. | ||
| fn check_merkle_root_consistency( | ||
| &self, | ||
| just_inserted_shreds: &HashMap<ShredId, Shred>, | ||
| slot: Slot, | ||
| merkle_root_meta: &MerkleRootMeta, | ||
| shred: &Shred, | ||
| duplicate_shreds: &mut Vec<PossibleDuplicateShred>, | ||
| ) -> bool { | ||
| let new_merkle_root = shred.merkle_root().ok(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we check for the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also considered this as an option - since this is code is temporary until the feature flag is turned on I went with this approach to keep the PR small. There are 183 calls (mostly all testing code) for I figured it would be easiest to check the feature flag in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fine, if there is no correctness concern.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we don't insert into blockstore until window service, other variants of duplicate proofs will not be precluded from being processed if the feature flag is not present. additionally because they are separate enum variants, multiple signals can be sent to window service if a block violates multiple duplicate conditions. |
||
| if merkle_root_meta.merkle_root() == new_merkle_root { | ||
| // No conflict, either both merkle shreds with same merkle root | ||
| // or both legacy shreds with merkle_root `None` | ||
AshwinSekar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return true; | ||
| } | ||
|
|
||
| warn!( | ||
| "Received conflicting merkle roots for slot: {}, erasure_set: {:?} | ||
| original merkle root meta {:?} vs | ||
| conflicting merkle root {:?} shred index {} type {:?}. Reporting as duplicate", | ||
| slot, | ||
| shred.erasure_set(), | ||
| merkle_root_meta, | ||
| new_merkle_root, | ||
| shred.index(), | ||
| shred.shred_type(), | ||
| ); | ||
|
|
||
| if !self.has_duplicate_shreds_in_slot(slot) { | ||
| let shred_id = ShredId::new( | ||
| slot, | ||
| merkle_root_meta.first_received_shred_index(), | ||
| merkle_root_meta.first_received_shred_type(), | ||
| ); | ||
| let conflicting_shred = self | ||
| .get_shred_from_just_inserted_or_db(just_inserted_shreds, shred_id) | ||
| .unwrap_or_else(|| { | ||
| panic!("First received shred indicated by merkle root meta {:?} is missing from blockstore. This inconsistency may cause duplicate block detection to fail", merkle_root_meta); | ||
| }) | ||
| .into_owned(); | ||
| duplicate_shreds.push(PossibleDuplicateShred::MerkleRootConflict( | ||
| shred.clone(), | ||
| conflicting_shred, | ||
| )); | ||
| } | ||
| false | ||
| } | ||
|
|
||
| fn should_insert_data_shred( | ||
| &self, | ||
| shred: &Shred, | ||
|
|
@@ -1575,12 +1666,16 @@ impl Blockstore { | |
| .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); | ||
|
|
||
| if !self.has_duplicate_shreds_in_slot(slot) { | ||
| let shred_id = ShredId::new( | ||
| slot, | ||
| u32::try_from(last_index.unwrap()).unwrap(), | ||
| ShredType::Data, | ||
| ); | ||
| let ending_shred: Vec<u8> = self | ||
| .get_data_shred_from_just_inserted_or_db( | ||
| just_inserted_shreds, | ||
| slot, | ||
| last_index.unwrap(), | ||
| ) | ||
| .get_shred_from_just_inserted_or_db(just_inserted_shreds, shred_id) | ||
| .unwrap_or_else(|| { | ||
| panic!("Last index data shred indicated by slot meta {:?} is missing from blockstore. This inconsistency may cause duplicate block detection to fail", slot_meta) | ||
| }) | ||
| .into_owned(); | ||
|
|
||
| if self | ||
|
|
@@ -1614,12 +1709,16 @@ impl Blockstore { | |
| .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); | ||
|
|
||
| if !self.has_duplicate_shreds_in_slot(slot) { | ||
| let shred_id = ShredId::new( | ||
| slot, | ||
| u32::try_from(slot_meta.received - 1).unwrap(), | ||
| ShredType::Data, | ||
| ); | ||
| let ending_shred: Vec<u8> = self | ||
| .get_data_shred_from_just_inserted_or_db( | ||
| just_inserted_shreds, | ||
| slot, | ||
| slot_meta.received - 1, | ||
| ) | ||
| .get_shred_from_just_inserted_or_db(just_inserted_shreds, shred_id) | ||
| .unwrap_or_else(|| { | ||
| panic!("Last received data shred indicated by slot meta {:?} is missing from blockstore. This inconsistency may cause duplicate block detection to fail", slot_meta) | ||
| }) | ||
| .into_owned(); | ||
|
|
||
| if self | ||
|
|
@@ -6887,7 +6986,7 @@ pub mod tests { | |
| let mut write_batch = blockstore.db.batch().unwrap(); | ||
| let mut duplicates = vec![]; | ||
|
|
||
| assert!(blockstore.check_insert_coding_shred( | ||
| assert!(!blockstore.check_insert_coding_shred( | ||
| new_coding_shred.clone(), | ||
| &mut erasure_metas, | ||
| &mut merkle_root_metas, | ||
|
|
@@ -6901,6 +7000,13 @@ pub mod tests { | |
| &mut BlockstoreInsertionMetrics::default(), | ||
| )); | ||
|
|
||
| // No insert, notify duplicate | ||
| assert_eq!(duplicates.len(), 1); | ||
| match &duplicates[0] { | ||
| PossibleDuplicateShred::MerkleRootConflict(shred, _) if shred.slot() == slot => (), | ||
| _ => panic!("No merkle root conflict"), | ||
| } | ||
|
|
||
| // Verify that we still have the merkle root meta from the original shred | ||
| assert_eq!(merkle_root_metas.len(), 1); | ||
| assert_eq!( | ||
|
|
@@ -7095,7 +7201,14 @@ pub mod tests { | |
| None, | ||
| ShredSource::Turbine, | ||
| ) | ||
| .is_ok()); | ||
| .is_err()); | ||
|
|
||
| // No insert, notify duplicate | ||
| assert_eq!(duplicates.len(), 1); | ||
| assert_matches!( | ||
| duplicates[0], | ||
| PossibleDuplicateShred::MerkleRootConflict(_, _) | ||
| ); | ||
|
|
||
| // Verify that we still have the merkle root meta from the original shred | ||
| assert_eq!(merkle_root_metas.len(), 1); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why blockstore doesn't check the feature flag before sending this?