Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Basic request based collator side.
  • Loading branch information
eskimor committed Mar 12, 2021
commit 7845a4b8f77e4a88cf2fd7bff3464e33b63fd2d9
112 changes: 49 additions & 63 deletions node/network/collator-protocol/src/collator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ use super::{LOG_TARGET, Result};

use futures::{select, FutureExt, channel::oneshot};

use polkadot_primitives::v1::{
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, CandidateHash,
};
use polkadot_primitives::v1::{CandidateHash, CandidateReceipt, CollatorId, CompressedPoV, CoreIndex, CoreState, Hash, Id as ParaId, PoV, ValidatorId};
use polkadot_subsystem::{
jaeger, PerLeafSpan,
FromOverseer, OverseerSignal, SubsystemContext,
messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage, NetworkBridgeEvent},
};
use polkadot_node_network_protocol::{
peer_set::PeerSet, v1 as protocol_v1, View, PeerId, RequestId, OurView,
};
use polkadot_node_network_protocol::{OurView, PeerId, View, peer_set::PeerSet, request_response::{IncomingRequest, v1::{CollationFetchingRequest, CollationFetchingResponse}}, v1 as protocol_v1};
use polkadot_node_subsystem_util::{
validator_discovery,
request_validators_ctx,
Expand Down Expand Up @@ -562,6 +558,44 @@ async fn process_msg(
);
}
},
CollationFetchingRequest(incoming) => {
let _span = state.span_per_relay_parent.get(&incoming.payload.relay_parent).map(|s| s.child("request-collation"));
match state.collating_on {
Some(our_para_id) => {
if our_para_id == incoming.payload.para_id {
let (receipt, pov) = if let Some(collation) = state.collations.get_mut(&incoming.payload.relay_parent) {
collation.status.advance_to_requested();
(collation.receipt.clone(), collation.pov.clone())
} else {
tracing::warn!(
target: LOG_TARGET,
relay_parent = %incoming.payload.relay_parent,
"received a `RequestCollation` for a relay parent we don't have collation stored.",
);

return Ok(());
};

let _span = _span.as_ref().map(|s| s.child("sending"));
send_collation(ctx, state, incoming, receipt, pov).await;
} else {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %incoming.payload.para_id,
our_para_id = %our_para_id,
"received a `RequestCollation` for unexpected para_id",
);
}
}
None => {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %incoming.payload.para_id,
"received a `RequestCollation` while not collating on any para",
);
}
}
}
}

Ok(())
Expand All @@ -572,15 +606,14 @@ async fn process_msg(
async fn send_collation(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
request_id: RequestId,
origin: PeerId,
request: IncomingRequest<CollationFetchingRequest>,
receipt: CandidateReceipt,
pov: PoV,
) {
let pov = match protocol_v1::CompressedPoV::compress(&pov) {
let pov = match CompressedPoV::compress(&pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
tracing::error!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`",
Expand All @@ -589,15 +622,12 @@ async fn send_collation(
}
};

let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov);

ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
vec![origin],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;

if let Err(err) = request.send_response(CollationFetchingResponse::Collation(receipt, pov)) {
tracing::warn!(
target: LOG_TARGET,
"Sending collation response failed",
);
}
state.metrics.on_collation_sent();
}

Expand All @@ -624,50 +654,6 @@ async fn handle_incoming_peer_message(
"AdvertiseCollation message is not expected on the collator side of the protocol",
);
}
RequestCollation(request_id, relay_parent, para_id) => {
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("request-collation"));
match state.collating_on {
Some(our_para_id) => {
if our_para_id == para_id {
let (receipt, pov) = if let Some(collation) = state.collations.get_mut(&relay_parent) {
collation.status.advance_to_requested();
(collation.receipt.clone(), collation.pov.clone())
} else {
tracing::warn!(
target: LOG_TARGET,
relay_parent = %relay_parent,
"received a `RequestCollation` for a relay parent we don't have collation stored.",
);

return Ok(());
};

let _span = _span.as_ref().map(|s| s.child("sending"));
send_collation(ctx, state, request_id, origin, receipt, pov).await;
} else {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %para_id,
our_para_id = %our_para_id,
"received a `RequestCollation` for unexpected para_id",
);
}
}
None => {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %para_id,
"received a `RequestCollation` while not collating on any para",
);
}
}
}
Collation(_, _, _) => {
tracing::warn!(
target: LOG_TARGET,
"Collation message is not expected on the collator side of the protocol",
);
}
CollationSeconded(statement) => {
if !matches!(statement.payload(), Statement::Seconded(_)) {
tracing::warn!(
Expand Down