-
Notifications
You must be signed in to change notification settings - Fork 896
Add in metrics for detecting Redundant Pulls #199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
04ef20b
de00d18
90707c3
6f7bd30
1d41d97
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,7 +75,7 @@ pub struct Crds { | |
| votes: BTreeMap<u64 /*insert order*/, usize /*index*/>, | ||
| // Indices of EpochSlots keyed by insert order. | ||
| epoch_slots: BTreeMap<u64 /*insert order*/, usize /*index*/>, | ||
| // Indices of DuplicateShred keyed by insert order. | ||
| // Indice of DuplicateShred keyed by insert order. | ||
|
||
| duplicate_shreds: BTreeMap<u64 /*insert order*/, usize /*index*/>, | ||
| // Indices of all crds values associated with a node. | ||
| records: HashMap<Pubkey, IndexSet<usize>>, | ||
|
|
@@ -115,6 +115,35 @@ pub(crate) struct CrdsDataStats { | |
| pub(crate) struct CrdsStats { | ||
| pub(crate) pull: CrdsDataStats, | ||
| pub(crate) push: CrdsDataStats, | ||
| pub(crate) redundant_pull: i64, | ||
|
||
| } | ||
|
|
||
| #[derive(PartialEq, Eq, Debug, Clone)] | ||
| struct NumPushRecv { | ||
| pub count: u8, | ||
| pub received_first_via_pull_response: bool, | ||
| } | ||
|
|
||
| impl NumPushRecv { | ||
| fn new(route: GossipRoute) -> Self { | ||
| let (count, received_first_via_pull_response) = match route { | ||
| GossipRoute::PullRequest => (0, true), | ||
| GossipRoute::PushMessage(_) => (1, false), | ||
| _ => (0, false), | ||
|
||
| }; | ||
| Self { | ||
| count, | ||
| received_first_via_pull_response, | ||
| } | ||
| } | ||
|
|
||
| // If first time message was received via PullResponse and count == 0, | ||
| // we know this data was first received via PullResponse. | ||
| // This is a redundant pull, meaning we received a PushMessage | ||
| // after we had already received the same message first via PullResponse | ||
| fn is_redundant_pull(&self) -> bool { | ||
| self.received_first_via_pull_response && self.count == 0 | ||
| } | ||
|
||
| } | ||
|
|
||
| /// This structure stores some local metadata associated with the CrdsValue | ||
|
|
@@ -127,8 +156,9 @@ pub struct VersionedCrdsValue { | |
| pub(crate) local_timestamp: u64, | ||
| /// value hash | ||
| pub(crate) value_hash: Hash, | ||
| /// Number of times duplicates of this value are recevied from gossip push. | ||
| num_push_dups: u8, | ||
| /// Tracks number of times this value is recevied from gossip push. | ||
| /// And if this value was first entered into crds via redundant pull | ||
| num_push_recv: NumPushRecv, | ||
| } | ||
|
|
||
| #[derive(Clone, Copy, Default)] | ||
|
|
@@ -147,14 +177,15 @@ impl Cursor { | |
| } | ||
|
|
||
| impl VersionedCrdsValue { | ||
| fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64) -> Self { | ||
| fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64, route: GossipRoute) -> Self { | ||
| let value_hash = hash(&serialize(&value).unwrap()); | ||
| let num_push_recv = NumPushRecv::new(route); | ||
| VersionedCrdsValue { | ||
| ordinal: cursor.ordinal(), | ||
| value, | ||
| local_timestamp, | ||
| value_hash, | ||
| num_push_dups: 0u8, | ||
| num_push_recv, | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -222,7 +253,7 @@ impl Crds { | |
| ) -> Result<(), CrdsError> { | ||
| let label = value.label(); | ||
| let pubkey = value.pubkey(); | ||
| let value = VersionedCrdsValue::new(value, self.cursor, now); | ||
| let value = VersionedCrdsValue::new(value, self.cursor, now, route); | ||
| match self.table.entry(label) { | ||
| Entry::Vacant(entry) => { | ||
| self.stats.lock().unwrap().record_insert(&value, route); | ||
|
|
@@ -303,8 +334,15 @@ impl Crds { | |
| Err(CrdsError::InsertFailed) | ||
| } else if matches!(route, GossipRoute::PushMessage(_)) { | ||
| let entry = entry.get_mut(); | ||
| entry.num_push_dups = entry.num_push_dups.saturating_add(1); | ||
| Err(CrdsError::DuplicatePush(entry.num_push_dups)) | ||
| if entry.num_push_recv.is_redundant_pull() { | ||
| self.stats.lock().unwrap().redundant_pull += 1; | ||
| } | ||
| entry.num_push_recv.count = entry.num_push_recv.count.saturating_add(1); | ||
| // num_push_recv.count tracks number of received push messages, but we return | ||
| // duplicate push message count. so we need to subtract 1 | ||
| Err(CrdsError::DuplicatePush( | ||
| entry.num_push_recv.count.saturating_sub(1), | ||
| )) | ||
| } else { | ||
| Err(CrdsError::InsertFailed) | ||
| } | ||
|
|
@@ -1450,8 +1488,9 @@ mod tests { | |
| #[allow(clippy::neg_cmp_op_on_partial_ord)] | ||
| fn test_equal() { | ||
| let val = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::default())); | ||
| let v1 = VersionedCrdsValue::new(val.clone(), Cursor::default(), 1); | ||
| let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1); | ||
| let v1 = | ||
| VersionedCrdsValue::new(val.clone(), Cursor::default(), 1, GossipRoute::LocalMessage); | ||
| let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1, GossipRoute::LocalMessage); | ||
| assert_eq!(v1, v2); | ||
| assert!(!(v1 != v2)); | ||
| assert!(!overrides(&v1.value, &v2)); | ||
|
|
@@ -1467,6 +1506,7 @@ mod tests { | |
| ))), | ||
| Cursor::default(), | ||
| 1, // local_timestamp | ||
| GossipRoute::LocalMessage, | ||
| ); | ||
| let v2 = VersionedCrdsValue::new( | ||
| { | ||
|
|
@@ -1476,6 +1516,7 @@ mod tests { | |
| }, | ||
| Cursor::default(), | ||
| 1, // local_timestamp | ||
| GossipRoute::LocalMessage, | ||
| ); | ||
|
|
||
| assert_eq!(v1.value.label(), v2.value.label()); | ||
|
|
@@ -1501,6 +1542,7 @@ mod tests { | |
| ))), | ||
| Cursor::default(), | ||
| 1, // local_timestamp | ||
| GossipRoute::LocalMessage, | ||
| ); | ||
| let v2 = VersionedCrdsValue::new( | ||
| CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost( | ||
|
|
@@ -1509,6 +1551,7 @@ mod tests { | |
| ))), | ||
| Cursor::default(), | ||
| 1, // local_timestamp | ||
| GossipRoute::LocalMessage, | ||
| ); | ||
| assert_eq!(v1.value.label(), v2.value.label()); | ||
| assert!(overrides(&v1.value, &v2)); | ||
|
|
@@ -1527,6 +1570,7 @@ mod tests { | |
| ))), | ||
| Cursor::default(), | ||
| 1, // local_timestamp | ||
| GossipRoute::LocalMessage, | ||
| ); | ||
| let v2 = VersionedCrdsValue::new( | ||
| CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost( | ||
|
|
@@ -1535,6 +1579,7 @@ mod tests { | |
| ))), | ||
| Cursor::default(), | ||
| 1, // local_timestamp | ||
| GossipRoute::LocalMessage, | ||
| ); | ||
| assert_ne!(v1, v2); | ||
| assert!(!(v1 == v2)); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we actually move this next to other
pull_response_...metrics?https://github.com/anza-xyz/agave/blob/81075e60b/gossip/src/cluster_info_metrics.rs#L293-L317
We can name it something like
num_redundant_pull_responses.