Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
857883b
Wip
eskimor Mar 29, 2021
4f11772
Merge branch 'master' into rk-req-res-runtime
eskimor Mar 30, 2021
7dc3c40
Increase proposer timeout.
eskimor Apr 1, 2021
7f428aa
WIP.
eskimor Apr 1, 2021
c045773
Better timeout values now that we are going to be connected to all no…
eskimor Mar 31, 2021
fe1f0a1
Better and more consistent sizes.
eskimor Apr 1, 2021
8a0b22f
Introduce statement fetching request.
eskimor Apr 1, 2021
86b147c
WIP
eskimor Apr 1, 2021
d68d646
Statement cache retrieval logic.
eskimor Apr 2, 2021
707c96b
Review remarks by @rphmeier
eskimor Apr 3, 2021
ded3f78
Fixes.
eskimor Apr 5, 2021
a64bfef
Better requester logic.
eskimor Apr 6, 2021
5aa7dba
WIP: Handle requester messages.
eskimor Apr 6, 2021
afc7f08
Missing dep.
eskimor Apr 6, 2021
10d7b8f
Fix request launching logic.
eskimor Apr 6, 2021
351a0d1
Finish fetching logic.
eskimor Apr 7, 2021
d9b80cc
Sending logic.
eskimor Apr 7, 2021
03e11ca
Redo code size calculations.
eskimor Apr 7, 2021
0a57c1f
Update Cargo.lock (new dep)
eskimor Apr 7, 2021
6ff978c
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 7, 2021
f4a1748
Get request receiver to statement distribution.
eskimor Apr 7, 2021
ad907dc
Expose new functionality for responding to requests.
eskimor Apr 7, 2021
186ab0e
Cleanup.
eskimor Apr 7, 2021
77c6d8c
Responder logic.
eskimor Apr 7, 2021
41243fa
Fixes + Cleanup.
eskimor Apr 7, 2021
6d76bbf
Cargo.lock
eskimor Apr 7, 2021
52d1a69
Whitespace.
eskimor Apr 7, 2021
095f78e
Add lost copyright.
eskimor Apr 7, 2021
129b0fa
Launch responder task.
eskimor Apr 7, 2021
ae3c492
Typo.
eskimor Apr 7, 2021
49b4eae
info -> warn
eskimor Apr 7, 2021
2e37f70
Typo.
eskimor Apr 7, 2021
c3e911a
Fix.
eskimor Apr 7, 2021
98d366e
Fix.
eskimor Apr 7, 2021
8118d0c
Update comment.
eskimor Apr 7, 2021
3687401
Doc fix.
eskimor Apr 7, 2021
e3610d2
Better large statement heuristics.
eskimor Apr 8, 2021
f901615
Fix tests.
eskimor Apr 8, 2021
8b99d30
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
e66dc8b
Fix network bridge tests.
eskimor Apr 8, 2021
ce8e5b5
Add test for size estimate.
eskimor Apr 8, 2021
12f78a1
Very simple tests that checks we get LargeStatement.
eskimor Apr 8, 2021
b76af99
Basic check, that fetching of large candidates is performed.
eskimor Apr 8, 2021
7f2936b
More tests.
eskimor Apr 8, 2021
f2883ff
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
2a56490
Basic metrics for responder.
eskimor Apr 8, 2021
653ea6a
More metrics.
eskimor Apr 8, 2021
861e8d8
Use Encode::encoded_size().
eskimor Apr 8, 2021
ba8b7fb
Some useful spans.
eskimor Apr 8, 2021
42945ce
Get rid of redundant metrics.
eskimor Apr 8, 2021
2fcde8f
Don't add peer on duplicate.
eskimor Apr 8, 2021
6acc8ab
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
0b51e89
Properly check hash
eskimor Apr 8, 2021
606ff6b
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 8, 2021
ca11986
Preserve ordering + better flood protection.
eskimor Apr 9, 2021
3180253
Get rid of redundant clone.
eskimor Apr 9, 2021
5af1636
Don't shutdown responder on failed query.
eskimor Apr 9, 2021
5c4e316
Smaller fixes.
eskimor Apr 9, 2021
1454d6a
Quotes.
eskimor Apr 9, 2021
48cac47
Better queue size calculation.
eskimor Apr 9, 2021
0f3a21c
Merge branch 'master' into rk-req-res-runtime
eskimor Apr 9, 2021
e506d64
A bit saner response sizes.
eskimor Apr 9, 2021
25df93f
Fixes.
eskimor Apr 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Finish fetching logic.
  • Loading branch information
eskimor committed Apr 7, 2021
commit 351a0d15ff334d58c5928284e1d39f2c706502fd
2 changes: 1 addition & 1 deletion node/network/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ pub mod v1 {
}

/// Data that maes a statement unique.
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, Hash)]
pub struct StatementMetadata {
/// Relayt parent this statement is relevant under.
pub relay_parent: Hash,
Expand Down
215 changes: 190 additions & 25 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod requester;
use requester::{BuildStatementError, RequesterMessage, build_signed_full_statement, fetch};

const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement");
const COST_FETCH_FAIL: Rep = Rep::CostMinor("Requesting `CommittedCandidateReceipt` from peer failed.");
const COST_INVALID_SIGNATURE: Rep = Rep::CostMajor("Invalid Statement Signature");
const COST_DUPLICATE_STATEMENT: Rep = Rep::CostMajorRepeated("Statement sent more than once by peer");
const COST_APPARENT_FLOOD: Rep = Rep::Malicious("Peer appears to be flooding us with statements");
Expand Down Expand Up @@ -384,7 +385,7 @@ enum LargeStatementStatus {
struct FetchingInfo {
/// All peers that send us a `LargeStatement` for the given `CandidateHash`, together with
/// their announced metadata.
available_peers: Vec<(PeerId, StatementMetadata)>,
available_peers: HashMap<PeerId, StatementMetadata>,
/// Peers left to try in case the background task needs it.
peers_to_try: Vec<PeerId>,
/// Sender for sending fresh peers to the fetching task in case of failure.
Expand Down Expand Up @@ -778,10 +779,11 @@ async fn handle_incoming_message_and_circulate<'a>(
message: protocol_v1::StatementDistributionMessage,
our_view: &View,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
req_sender: mpsc::Sender<RequesterMessage>,
ctx: &mut impl SubsystemContext,
req_sender: &mpsc::Sender<RequesterMessage>,
metrics: &Metrics,
) {
let relay_parent = message.get_metadata().relay_parent;
let result = retrieve_statement_from_message(
peer,
message,
Expand All @@ -791,14 +793,14 @@ async fn handle_incoming_message_and_circulate<'a>(
req_sender,
).await;

if let Some(active_head, statement) = result {
if let Some((active_head, statement)) = result {
handle_incoming_statement_and_circulate(
peer,
peers,
&*our_view,
active_head,
ctx,
message.get_metadata().relay_parent,
relay_parent,
statement,
metrics,
).await;
Expand All @@ -817,8 +819,8 @@ async fn retrieve_statement_from_message<'a>(
message: protocol_v1::StatementDistributionMessage,
our_view: &View,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
req_sender: mpsc::Sender<RequesterMessage>,
ctx: &mut impl SubsystemContext,
req_sender: &mpsc::Sender<RequesterMessage>,
) -> Option<(&'a mut ActiveHeadData, SignedFullStatement)> {

let metadata = message.get_metadata();
Expand Down Expand Up @@ -863,7 +865,7 @@ async fn retrieve_statement_from_message<'a>(

match occupied.get_mut() {
LargeStatementStatus::Fetching(info) => {
info.available_peers.push((peer, metadata));
info.available_peers.insert(peer, metadata);
info.peers_to_try.push(peer);
// Answer any pending request for more peers:
if let Some(sender) = std::mem::take(&mut info.peer_sender) {
Expand All @@ -876,7 +878,9 @@ async fn retrieve_statement_from_message<'a>(
}
}
LargeStatementStatus::Fetched(committed) => {

let validator_id = active_head.validators.get(metadata.signed_by.0 as usize);

if let Some(validator_id) = validator_id {
let signing_context = SigningContext {
session_index: active_head.session_index,
Expand Down Expand Up @@ -906,7 +910,7 @@ async fn retrieve_statement_from_message<'a>(
validator_index = ?metadata.signed_by,
"Building statement failed - invalid signature!"
);
new_status = launch_request(metadata, peer, req_sender, ctx).await;
new_status = launch_request(metadata, peer, req_sender.clone(), ctx).await;
}
} else {
tracing::debug!(
Expand Down Expand Up @@ -941,15 +945,28 @@ async fn launch_request(
req_sender: mpsc::Sender<RequesterMessage>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
) -> Option<LargeStatementStatus> {
let (task, handle) = fetch(meta, vec![peer], req_sender).remote_handle();

let (task, handle) = fetch(
meta.relay_parent,
meta.candidate_hash,
vec![peer],
req_sender
)
.remote_handle();

let result = ctx.spawn("large-statement-fetcher", task.boxed())
.await;
if let Err(err) = result {
tracing::warn!(target: LOG_TARGET, ?err, "Spawning task failed.");
return None
}
let available_peers = {
let mut m = HashMap::new();
m.insert(peer, meta);
m
};
Some(LargeStatementStatus::Fetching(FetchingInfo {
available_peers: vec![peer],
available_peers,
peers_to_try: Vec::new(),
peer_sender: None,
fetching_task: handle,
Expand All @@ -963,7 +980,7 @@ async fn handle_incoming_statement_and_circulate<'a>(
peers: &mut HashMap<PeerId, PeerData>,
our_view: &View,
active_head: &'a mut ActiveHeadData,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
statement: SignedFullStatement,
metrics: &Metrics,
Expand Down Expand Up @@ -1105,7 +1122,7 @@ async fn handle_incoming_statement<'a>(
async fn update_peer_view_and_send_unlocked(
peer: PeerId,
peer_data: &mut PeerData,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
ctx: &mut impl SubsystemContext,
active_heads: &HashMap<Hash, ActiveHeadData>,
new_view: View,
metrics: &Metrics,
Expand Down Expand Up @@ -1136,12 +1153,12 @@ async fn update_peer_view_and_send_unlocked(
}
}

#[tracing::instrument(level = "trace", skip(peers, active_heads, ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_network_update(
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext,
our_view: &mut OurView,
req_sender: &mpsc::Sender<RequesterMessage>,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
) {
Expand Down Expand Up @@ -1174,6 +1191,7 @@ async fn handle_network_update(
&*our_view,
active_heads,
ctx,
req_sender,
metrics,
).await;
}
Expand Down Expand Up @@ -1243,12 +1261,17 @@ impl StatementDistribution {
&mut peers,
&mut our_view,
&mut active_heads,
&req_sender,
result?,
)
.await?,
Message::Requester(result) =>
self.handle_requester_messages(
&mut ctx,
&mut peers,
&mut our_view,
&mut active_heads,
&req_sender,
result.ok_or(SubsystemError::Context(
"Failed to read from requester receiver (stream finished)"
.to_string()
Expand All @@ -1266,25 +1289,41 @@ impl StatementDistribution {
async fn handle_requester_messages(
&self,
ctx: &mut impl SubsystemContext,
peers: &mut HashMap<PeerId, PeerData>,
our_view: &mut OurView,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
req_sender: &mpsc::Sender<RequesterMessage>,
message: RequesterMessage,
) -> SubsystemResult<bool> {
match message {
RequesterMessage::Finished {
metadata,
RequesterMessage::Verify {
relay_parent,
candidate_hash,
from_peer,
response,
bad_peers,
}=> {
let active_head = match active_heads.get_mut(&metadata.relay_parent) {
carry_on,
} => {

for bad in bad_peers {
report_peer(ctx, bad, COST_FETCH_FAIL);
}

let active_head = match active_heads.get_mut(&relay_parent) {
Some(head) => head,
None => return Ok(false),
};

let status = match active_head
let status = active_head
.waiting_large_statements
.remove(&metadata.candidate_hash) {
.remove(&candidate_hash);

Some(status) => status,
let mut info = match status {
Some(LargeStatementStatus::Fetching(info)) => info,
Some(LargeStatementStatus::Fetched(_)) => {
debug_assert!(false, "On status fetched, fetching task already succeeded. qed.");
return Ok(false)
}
None => {
tracing::warn!(
target: LOG_TARGET,
Expand All @@ -1293,21 +1332,145 @@ impl StatementDistribution {
return Ok(false)
}
};
if let Some((from_peer, receipt)) = response {

let metadata = match info.available_peers.remove(&from_peer) {
Some(metadata) => metadata,
None => {
debug_assert!(false, "We insert all peers we start requester for. qed.");
return Ok(false)
}
};

let validator_id = active_head.validators.get(metadata.signed_by.0 as usize);

if let Some(validator_id) = validator_id {
let signing_context = SigningContext {
session_index: active_head.session_index,
parent_hash: metadata.relay_parent,
};

let statement = SignedFullStatement::new(
Statement::Seconded(response.clone()),
metadata.signed_by,
metadata.signature.clone(),
&signing_context,
validator_id,
);

if let Some(statement) = statement {
active_head.waiting_large_statements.insert(
candidate_hash,
LargeStatementStatus::Fetched(response),
);
handle_incoming_statement_and_circulate(
from_peer,
peers,
&*our_view,
active_head,
ctx,
relay_parent,
statement,
&self.metrics,
).await;

// Cache is now populated, resend all other messages:
for (peer, metadata) in info.available_peers {
let message =
protocol_v1::StatementDistributionMessage::LargeStatement(
metadata
);
handle_incoming_message_and_circulate(
peer,
peers,
message,
&*our_view,
active_heads,
ctx,
req_sender,
&self.metrics,
)
.await;
}

} else {
// Peer sent us garbage data.
tracing::debug!(
target: LOG_TARGET,
?from_peer,
?metadata,
"Data from peer did not match signature."
);
if let Err(_) = carry_on.send(()) {
debug_assert!(
false,
"Requester does not terminate, before receiving the response. qed."
);
}
// Put back status.
active_head.waiting_large_statements.insert(
candidate_hash,
LargeStatementStatus::Fetching(info),
);
report_peer(ctx, from_peer, COST_INVALID_SIGNATURE);
}
} else {
tracing::debug!(
target: LOG_TARGET,
validator_index = ?metadata.signed_by,
"Error loading statement, could not find key for validator."
);
return Ok(false)
}

}
RequesterMessage::SendRequest(req) => {
ctx.send(
ctx.send_message(
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
vec![req],
IfDisconnected::ImmediateError,
)
))
.await?;
.await;
}
RequesterMessage::GetMorePeers => {
RequesterMessage::GetMorePeers {
relay_parent,
candidate_hash,
tx,
} => {
let active_head = match active_heads.get_mut(&relay_parent) {
Some(head) => head,
None => return Ok(false),
};

let status = active_head
.waiting_large_statements
.get_mut(&candidate_hash);

let info = match status {
Some(LargeStatementStatus::Fetching(info)) => info,
Some(LargeStatementStatus::Fetched(_)) => {
debug_assert!(false, "On status fetched, fetching task already succeeded. qed.");
return Ok(false)
}
None => {
tracing::warn!(
target: LOG_TARGET,
"Received get more peers event for non existent status - not supposed to happen."
);
return Ok(false)
}
};

if info.peers_to_try.is_empty() {
info.peer_sender = Some(tx);
} else {
let peers_to_try = std::mem::take(&mut info.peers_to_try);
if let Err(peers) = tx.send(peers_to_try) {
// No longer interested for now - might want them later:
info.peers_to_try = peers;
}
}
}
}
Ok(false)
Expand All @@ -1319,6 +1482,7 @@ impl StatementDistribution {
peers: &mut HashMap<PeerId, PeerData>,
our_view: &mut OurView,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
req_sender: &mpsc::Sender<RequesterMessage>,
message: FromOverseer<StatementDistributionMessage>,
) -> SubsystemResult<bool> {
let metrics = &self.metrics;
Expand Down Expand Up @@ -1399,6 +1563,7 @@ impl StatementDistribution {
ctx,
our_view,
event,
&req_sender,
metrics,
).await;
}
Expand Down
Loading