Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions node/network/bitfield-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
maplit = "1.0.2"
log = "0.4.11"
env_logger = "0.8.2"
assert_matches = "1.4.0"
tempfile = "3.1.0"
227 changes: 187 additions & 40 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,16 @@ where
// check interest in the peer in this message's relay parent
if view.contains(&message.relay_parent) {
// track the message as sent for this peer
message_sent_to_peer
let is_new = message_sent_to_peer
.entry(peer.clone())
.or_default()
.insert(validator.clone());

Some(peer.clone())
if is_new {
Some(peer.clone())
} else {
None
}
} else {
None
}
Expand Down Expand Up @@ -443,6 +447,9 @@ where
}
one_per_validator.insert(validator.clone(), message.clone());

// If the peer has sent us a message, we don't need to send him the same.
job_data.message_sent_to_peer.entry(origin.clone()).or_default().insert(validator.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is the set of received messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point. But why should we update the message_sent_to_peer if we've only received the message? It seems like we should only update the message_received_from_peer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also check message_received_from_peer in the interested_peers computation in relay_peers, I guess.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point. But why should we update the message_sent_to_peer if we've only received the message? It seems like we should only update the message_received_from_peer

If I have received the message from you, what is the benefit of sending you the same message again? This would happen below in `relay_message'. While you will not decrease my reputation for this, as I did not yet send you this message, I just waste my bandwidth for a message that is already known by you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who says they're the same message? We only know that they're from the same validator. The bitfield distribution section of the guide goes deeper into the rationale.

That is a good point, but we actually don't account for this in the code? As far as I have seen, we only check if we have send/received a message based on the validator is message is coming from.

You are saying that we actually should look into the message and check if they are different, is that correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are saying that we actually should look into the message and check if they are different, is that correct?

No, the change I suggest is just to check if we've received a message from the peer before sending a message. I think the thing this is trying to work around is where both peers send each other the message at the same time and then report each other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't try to work around this and this did not happen. We also just report a peer if it sends us the same message twice. We don't check if we send the message to this peer already for reporting the peer. Because as you said, this is very likely to end in a race condition.

However, I will do your requested changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other thing that I spoke about is that the code currently assumes that each validator only sends one bitfield per relay chain. Will this always be true or will we may change this in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will be true for the foreseeable future


relay_message(ctx, job_data, &mut state.peer_views, validator, message).await;

modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await
Expand Down Expand Up @@ -529,11 +536,7 @@ async fn handle_peer_view_change<Context>(
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
let current = state.peer_views.entry(origin.clone()).or_default();

let added: Vec<Hash> = view.difference(&*current).cloned().collect();

*current = view;
let added = state.peer_views.entry(origin.clone()).or_default().replace_difference(view).cloned().collect::<Vec<_>>();

// Send all messages we've seen before and the peer is now interested
// in to that peer.
Expand Down Expand Up @@ -755,7 +758,7 @@ mod test {
use polkadot_node_subsystem_util::TimeoutExt;
use sp_keystore::{SyncCryptoStorePtr, SyncCryptoStore};
use sp_application_crypto::AppKey;
use sc_keystore::LocalKeystore;
use sp_keystore::testing::KeyStore;
use std::sync::Arc;
use std::time::Duration;
use assert_matches::assert_matches;
Expand All @@ -767,12 +770,6 @@ mod test {
];
}

macro_rules! peers {
( $( $peer:expr ),* $(,)? ) => [
vec![ $( $peer.clone() ),* ]
];
}

macro_rules! launch {
($fut:expr) => {
$fut
Expand Down Expand Up @@ -816,7 +813,6 @@ mod test {
fn state_with_view(
view: View,
relay_parent: Hash,
keystore_path: &tempfile::TempDir,
) -> (ProtocolState, SigningContext, SyncCryptoStorePtr, ValidatorId) {
let mut state = ProtocolState::default();

Expand All @@ -825,8 +821,7 @@ mod test {
parent_hash: relay_parent.clone(),
};

let keystore : SyncCryptoStorePtr = Arc::new(LocalKeystore::open(keystore_path.path(), None)
.expect("Creates keystore"));
let keystore : SyncCryptoStorePtr = Arc::new(KeyStore::new());
let validator = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
.expect("generating sr25519 key not to fail");

Expand Down Expand Up @@ -865,9 +860,7 @@ mod test {
};

// another validator not part of the validatorset
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
let keystore : SyncCryptoStorePtr = Arc::new(LocalKeystore::open(keystore_path.path(), None)
.expect("Creates keystore"));
let keystore : SyncCryptoStorePtr = Arc::new(KeyStore::new());
let malicious = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
.expect("Malicious key created");
let validator = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
Expand Down Expand Up @@ -929,10 +922,8 @@ mod test {
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);

let keystore_path = tempfile::tempdir().expect("Creates keystore path");
// validator 0 key pair
let (mut state, signing_context, keystore, validator) =
state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());

state.peer_views.insert(peer_b.clone(), view![hash_a]);

Expand Down Expand Up @@ -985,10 +976,8 @@ mod test {
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);

let keystore_path = tempfile::tempdir().expect("Creates keystore path");
// validator 0 key pair
let (mut state, signing_context, keystore, validator) =
state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());

// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
Expand Down Expand Up @@ -1085,6 +1074,97 @@ mod test {
});
}

#[test]
fn do_not_relay_message_twice() {
let _ = env_logger::builder()
.filter(None, log::LevelFilter::Trace)
.is_test(true)
.try_init();

let hash = Hash::random();

let peer_a = PeerId::random();
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);

// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash], hash.clone());

// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
let signed_bitfield =
executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 0, &validator))
.expect("should be signed");

state.peer_views.insert(peer_b.clone(), view![hash]);
state.peer_views.insert(peer_a.clone(), view![hash]);

let msg = BitfieldGossipMessage {
relay_parent: hash.clone(),
signed_availability: signed_bitfield.clone(),
};

let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);

executor::block_on(async move {
relay_message(
&mut ctx,
state.per_relay_parent.get_mut(&hash).unwrap(),
&mut state.peer_views,
validator.clone(),
msg.clone(),
).await;

assert_matches!(
handle.recv().await,
AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::Bitfield(h, signed)
)) => {
assert_eq!(h, hash);
assert_eq!(signed, signed_bitfield)
}
);

assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(peers, send_msg),
) => {
assert_eq!(2, peers.len());
assert!(peers.contains(&peer_a));
assert!(peers.contains(&peer_b));
assert_eq!(send_msg, msg.clone().into_validation_protocol());
}
);

// Relaying the message a second time shouldn't work.
relay_message(
&mut ctx,
state.per_relay_parent.get_mut(&hash).unwrap(),
&mut state.peer_views,
validator.clone(),
msg.clone(),
).await;

assert_matches!(
handle.recv().await,
AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::Bitfield(h, signed)
)) => {
assert_eq!(h, hash);
assert_eq!(signed, signed_bitfield)
}
);

// There shouldn't be any other message
assert!(handle.recv().timeout(Duration::from_millis(10)).await.is_none());
});
}

#[test]
fn changing_view() {
let _ = env_logger::builder()
Expand All @@ -1099,10 +1179,8 @@ mod test {
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);

let keystore_path = tempfile::tempdir().expect("Creates keystore path");
// validator 0 key pair
let (mut state, signing_context, keystore, validator) =
state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());

// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
Expand Down Expand Up @@ -1160,17 +1238,6 @@ mod test {
}
);

// gossip to the network
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage (
peers, out_msg,
)) => {
assert_eq!(peers, peers![peer_b]);
assert_eq!(out_msg, msg.clone().into_validation_protocol());
}
);

// reputation change for peer B
assert_matches!(
handle.recv().await,
Expand Down Expand Up @@ -1253,4 +1320,84 @@ mod test {

});
}

#[test]
fn do_not_send_message_back_to_origin() {
let _ = env_logger::builder()
.filter(None, log::LevelFilter::Trace)
.is_test(true)
.try_init();

let hash: Hash = [0; 32].into();

let peer_a = PeerId::random();
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);

// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash], hash);

// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
let signed_bitfield =
executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 0, &validator))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind wrapping this and other lines introduced by this PR that grew unwieldly?

.expect("should be signed");

state.peer_views.insert(peer_b.clone(), view![hash]);
state.peer_views.insert(peer_a.clone(), view![hash]);

let msg = BitfieldGossipMessage {
relay_parent: hash.clone(),
signed_availability: signed_bitfield.clone(),
};

let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);

executor::block_on(async move {
// send a first message
launch!(handle_network_msg(
&mut ctx,
&mut state,
&Default::default(),
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
msg.clone().into_network_message(),
),
));

assert_matches!(
handle.recv().await,
AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::Bitfield(hash, signed)
)) => {
assert_eq!(hash, hash);
assert_eq!(signed, signed_bitfield)
}
);

assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(peers, send_msg),
) => {
assert_eq!(1, peers.len());
assert!(peers.contains(&peer_a));
assert_eq!(send_msg, msg.clone().into_validation_protocol());
}
);

assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST)
}
);
});
}
}
9 changes: 9 additions & 0 deletions node/network/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ impl<M> NetworkBridgeEvent<M> {
pub struct View(pub Vec<Hash>);

impl View {
/// Replace `self` with `new`.
///
/// Returns an iterator that will yield all elements of `new` that were not part of `self`.
pub fn replace_difference(&mut self, new: View) -> impl Iterator<Item = &Hash> {
let old = std::mem::replace(self, new);

self.0.iter().filter(move |h| !old.contains(h))
}

/// Returns an iterator of the hashes present in `Self` but not in `other`.
pub fn difference<'a>(&'a self, other: &'a View) -> impl Iterator<Item = &'a Hash> + 'a {
self.0.iter().filter(move |h| !other.contains(h))
Expand Down