Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
69ef3fa
client/network: Add scaffolding for finality req to use req resp
mxinden Oct 26, 2020
e8df535
client/network/src/finality_requests: Remove
mxinden Oct 26, 2020
3207c53
client/network/src/behaviour: Pass request id down to sync
mxinden Oct 29, 2020
e3ef56e
client/network: Use request response for block requests
mxinden Oct 29, 2020
544ebfe
client/network: Move handler logic into *_*_handler.rs
mxinden Oct 30, 2020
8abd813
client/network: Track ongoing finality requests in protocol.rs
mxinden Nov 2, 2020
8da90c5
Merge branch 'paritytech/master' into use-request-response
mxinden Nov 2, 2020
45de155
client/network: Remove commented out finalization initialization
mxinden Nov 2, 2020
456568f
client/network: Add docs for request handlers
mxinden Nov 2, 2020
1a6b00e
client/network/finality_request_handler: Log errors
mxinden Nov 2, 2020
06dbd98
client/network/block_request_handler: Log errors
mxinden Nov 2, 2020
25e298f
client/network: Format
mxinden Nov 3, 2020
2ed6dd1
client/network: Handle block request failure
mxinden Nov 3, 2020
aacb507
protocols/network: Fix tests
mxinden Nov 3, 2020
1a72e0c
client/network/src/behaviour: Handle request sending errors
mxinden Nov 3, 2020
1385f5c
client/network: Move response handling into custom method
mxinden Nov 3, 2020
8fdf1bd
client/network/protocol: Handle block response errors
mxinden Nov 3, 2020
7460a21
client/network/protocol: Remove tracking of obsolete requests
mxinden Nov 4, 2020
f1e7691
client/network/protocol: Remove block request start time tracking
mxinden Nov 4, 2020
401a7f8
client/network/protocol: Refactor on_*_request_started
mxinden Nov 4, 2020
b375b84
client/network: Pass protocol config instead of protocol name
mxinden Nov 4, 2020
107aa83
client/network: Pass protocol config in tests
mxinden Nov 4, 2020
0535723
client/network/config: Document request response configs
mxinden Nov 6, 2020
7a3d71e
client/network/src/_request_handler: Document protocol config gen
mxinden Nov 6, 2020
64d2222
client/network/src/protocol: Document Peer request values
mxinden Nov 6, 2020
9560e32
client/network: Rework request response to always use oneshot
mxinden Nov 6, 2020
8b7b32d
client/network: Unified metric reporting for all request protocols
mxinden Nov 6, 2020
109fa3f
Merge branch 'paritytech/master' into use-request-response
mxinden Nov 6, 2020
d6a6a52
Merge branch 'paritytech/master' into use-request-response
mxinden Nov 11, 2020
1cd8a71
client/network: Move protobuf parsing into protocol.rs
mxinden Nov 11, 2020
d5ea27e
client/network/src/protocol: Return pending events after poll
mxinden Nov 11, 2020
9bfa9ca
Merge branch 'paritytech/master' into use-request-response
mxinden Nov 16, 2020
006fd6e
client/network: Improve error handling and documentation
mxinden Nov 16, 2020
1bca347
client/network/behaviour: Remove outdated error types
mxinden Nov 18, 2020
2797595
Update client/network/src/block_request_handler.rs
wheresaddie Nov 23, 2020
aaa981c
Update client/network/src/finality_request_handler.rs
wheresaddie Nov 23, 2020
d1361f2
Merge branch 'paritytech/master' into use-request-response
mxinden Nov 24, 2020
c19885e
client/network/protocol: Reduce reputation on timeout
mxinden Nov 25, 2020
81cadfc
client/network/protocol: Refine reputation changes
mxinden Nov 25, 2020
9236113
client/network/block_request_handler: Set and explain queue length
mxinden Nov 25, 2020
4c018f1
client/service: Deny block requests when light client
mxinden Nov 25, 2020
07c0d2f
client/service: Fix role matching
mxinden Nov 25, 2020
8c8bcb2
client: Enforce line width
mxinden Nov 25, 2020
9481e6d
client/network/request_responses: Fix unit tests
mxinden Nov 25, 2020
707c1bd
client/network: Expose time to build response via metrics
mxinden Nov 27, 2020
e05c1ad
Merge branch 'paritytech/master' into use-request-response
mxinden Nov 27, 2020
df5907f
client/network/request_responses: Fix early connection closed error
mxinden Nov 30, 2020
4096019
Merge branch 'paritytech/master' into use-request-response
mxinden Nov 30, 2020
24e14c8
Merge branch 'paritytech/master' into use-request-response
mxinden Nov 30, 2020
94a1a79
client/network/protocol: Fix line length
mxinden Dec 1, 2020
db776ca
Merge branch 'paritytech/master' into use-request-response
mxinden Dec 1, 2020
8e846e0
client/network/protocol: Disconnect on most request failures
mxinden Dec 1, 2020
4c45b4b
Merge branch 'paritytech/master' into use-request-response
mxinden Dec 2, 2020
5277841
Merge branch 'paritytech/master' into use-request-response
mxinden Dec 10, 2020
bccc53e
Merge branch 'paritytech/master' into use-request-response
mxinden Dec 16, 2020
e0ae207
Merge branch 'paritytech/master' into use-request-response
mxinden Dec 18, 2020
d2c5161
client/network/protocol: Disconnect peer when oneshot is canceled
mxinden Jan 4, 2021
cc1bff0
Merge branch 'paritytech/master' into use-request-response
mxinden Jan 4, 2021
9acc7ee
Merge branch 'paritytech/master' into use-request-response
mxinden Jan 4, 2021
34d0351
client/network/protocol: Disconnect peer even when connection closed
mxinden Jan 4, 2021
2cb76ef
client/network/protocol: Remove debugging log line
mxinden Jan 5, 2021
d525b96
client/network/request_response: Use Clone::clone for error
mxinden Jan 5, 2021
1da9269
client/network/request_response: Remove outdated comment
mxinden Jan 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
client/network: Use request response for block requests
  • Loading branch information
mxinden committed Oct 29, 2020
commit e3ef56ed91532014159b11a8d353ffa68ed98d1c
121 changes: 85 additions & 36 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,27 +333,66 @@ Behaviour<B, H> {
CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) =>
self.events.push_back(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)),
CustomMessageOutcome::BlockRequest { target, request } => {
match self.block_requests.send_request(&target, request) {
block_requests::SendRequestOutcome::Ok => {
self.events.push_back(BehaviourOut::OpaqueRequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_owned(),
});

let protobuf_req = crate::schema::v1::BlockRequest {
fields: request.fields.to_be_u32(),
from_block: match request.from {
message::FromBlock::Hash(h) =>
Some(crate::schema::v1::block_request::FromBlock::Hash(h.encode())),
message::FromBlock::Number(n) =>
Some(crate::schema::v1::block_request::FromBlock::Number(n.encode())),
},
block_requests::SendRequestOutcome::Replaced { request_duration, .. } => {
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: target.clone(),
protocol: self.block_requests.protocol_name().to_owned(),
request_duration,
});
self.events.push_back(BehaviourOut::OpaqueRequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_owned(),
});
}
block_requests::SendRequestOutcome::NotConnected |
block_requests::SendRequestOutcome::EncodeError(_) => {},
to_block: request.to.map(|h| h.encode()).unwrap_or_default(),
direction: match request.direction {
message::Direction::Ascending => crate::schema::v1::Direction::Ascending as i32,
message::Direction::Descending => crate::schema::v1::Direction::Descending as i32,
},
max_blocks: request.max.unwrap_or(0),
};

let mut buf = Vec::with_capacity(protobuf_req.encoded_len());
if let Err(err) = protobuf_req.encode(&mut buf) {
log::warn!(
target: "sync",
"Failed to encode block request {:?}: {:?}",
protobuf_req,
err
);
panic!();
// return SendRequestOutcome::EncodeError(err);
}

let request_id = self.request_responses.send_request(&target, "def", buf).unwrap();
// TODO: differentiate between the two ids.
self.substrate.on_block_request_started(target, request.id, request_id);






// TODO: Entirely ignoring any previous requests now. Fine?
// match self.block_requests.send_request(&target, request) {
// block_requests::SendRequestOutcome::Ok => {
// self.events.push_back(BehaviourOut::OpaqueRequestStarted {
// peer: target,
// protocol: self.block_requests.protocol_name().to_owned(),
// });
// },
// block_requests::SendRequestOutcome::Replaced { request_duration, .. } => {
// self.events.push_back(BehaviourOut::OpaqueRequestFinished {
// peer: target.clone(),
// protocol: self.block_requests.protocol_name().to_owned(),
// request_duration,
// });
// self.events.push_back(BehaviourOut::OpaqueRequestStarted {
// peer: target,
// protocol: self.block_requests.protocol_name().to_owned(),
// });
// }
// block_requests::SendRequestOutcome::NotConnected |
// block_requests::SendRequestOutcome::EncodeError(_) => {},
// }
},
CustomMessageOutcome::FinalityProofRequest { target, block_hash, request } => {
let protobuf_rq = crate::schema::v1::finality::FinalityProofRequest {
Expand Down Expand Up @@ -418,16 +457,25 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<request_responses::Even
});
}
request_responses::Event::RequestFinished { peer, protocol, request_id, result } => {
if protocol == "abc" {
let proof_response = crate::schema::v1::finality::FinalityProofResponse::decode(&result.unwrap()[..])
.unwrap();
let ev = self.substrate.on_finality_proof_response(peer, request_id, proof_response.proof);
self.inject_event(ev);
} else {
self.events.push_back(BehaviourOut::RequestFinished {
request_id,
result,
});
match protocol {
Cow::Borrowed("abc") => {
let proof_response = crate::schema::v1::finality::FinalityProofResponse::decode(&result.unwrap()[..])
.unwrap();
let ev = self.substrate.on_finality_proof_response(peer, request_id, proof_response.proof);
self.inject_event(ev);
}
Cow::Borrowed("def") => {
let protobuf_response = crate::schema::v1::BlockResponse::decode(&result.unwrap()[..]).unwrap();

let ev = self.substrate.on_block_response(peer, request_id, protobuf_response);
self.inject_event(ev);
}
_ => {
self.events.push_back(BehaviourOut::RequestFinished {
request_id,
result,
});
}
}
},
}
Expand All @@ -445,13 +493,14 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B
});
},
block_requests::Event::Response { peer, response, request_duration } => {
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_owned(),
request_duration,
});
let ev = self.substrate.on_block_response(peer, response);
self.inject_event(ev);
panic!();
// self.events.push_back(BehaviourOut::OpaqueRequestFinished {
// peer: peer.clone(),
// protocol: self.block_requests.protocol_name().to_owned(),
// request_duration,
// });
// let ev = self.substrate.on_block_response(peer, response);
// self.inject_event(ev);
}
block_requests::Event::RequestCancelled { peer, request_duration, .. } |
block_requests::Event::RequestTimeout { peer, request_duration, .. } => {
Expand Down
169 changes: 127 additions & 42 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,9 @@ struct PacketStats {
struct Peer<B: BlockT, H: ExHashT> {
info: PeerInfo<B>,
/// Current block request, if any.
block_request: Option<(Instant, message::BlockRequest<B>)>,
block_request: Option<(Instant, message::BlockRequest<B>, Option<libp2p::request_response::RequestId>)>,
/// Requests we are no longer interested in.
// TODO: Do we still need this at all?
obsolete_requests: HashMap<message::RequestId, Instant>,
/// Holds a set of transactions known to this peer.
known_transactions: LruHashSet<H>,
Expand Down Expand Up @@ -701,52 +702,120 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
/// Must contain the same `PeerId` and request that have been emitted.
pub fn on_block_response(
&mut self,
peer: PeerId,
response: message::BlockResponse<B>,
peer_id: PeerId,
request_id: libp2p::request_response::RequestId,
response: crate::schema::v1::BlockResponse,
) -> CustomMessageOutcome<B> {
let request = if let Some(ref mut p) = self.context_data.peers.get_mut(&peer) {
if p.obsolete_requests.remove(&response.id).is_some() {
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
let peer = match self.context_data.peers.get_mut(&peer_id) {
Some(peer) => peer,
None => {
// TODO: Bring back.
// trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
return CustomMessageOutcome::None;
}
// Clear the request. If the response is invalid peer will be disconnected anyway.
match p.block_request.take() {
Some((_, request)) if request.id == response.id => request,
Some(_) => {
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
return CustomMessageOutcome::None;
}
None => {
trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
return CustomMessageOutcome::None;
}
};

let block_request = match peer.block_request.take() {
Some(req) => req,
None => {
trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer_id);
self.behaviour.disconnect_peer(&peer_id);
self.peerset_handle.report_peer(peer_id, rep::UNEXPECTED_RESPONSE);
return CustomMessageOutcome::None;
}
} else {
trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
};

if block_request.2 != Some(request_id) {
// TODO: Properly handle this.
panic!("request id didn't match or was none");
return CustomMessageOutcome::None;
}

let block_request = block_request.1;

let blocks = response.blocks.into_iter().map(|block_data| {
Ok(message::BlockData::<B> {
hash: Decode::decode(&mut block_data.hash.as_ref())?,
header: if !block_data.header.is_empty() {
Some(Decode::decode(&mut block_data.header.as_ref())?)
} else {
None
},
body: if block_request.fields.contains(message::BlockAttributes::BODY) {
Some(block_data.body.iter().map(|body| {
Decode::decode(&mut body.as_ref())
}).collect::<Result<Vec<_>, _>>()?)
} else {
None
},
receipt: if !block_data.message_queue.is_empty() {
Some(block_data.receipt)
} else {
None
},
message_queue: if !block_data.message_queue.is_empty() {
Some(block_data.message_queue)
} else {
None
},
justification: if !block_data.justification.is_empty() {
Some(block_data.justification)
} else if block_data.is_empty_justification {
Some(Vec::new())
} else {
None
},
})
}).collect::<Result<Vec<_>, codec::Error>>().unwrap();

let block_response = message::BlockResponse::<B> {
id: block_request.id,
blocks,
};

// let request = if let Some(ref mut p) = self.context_data.peers.get_mut(&peer) {
// if p.obsolete_requests.remove(&response.id).is_some() {
// trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
// return CustomMessageOutcome::None;
// }
// // Clear the request. If the response is invalid peer will be disconnected anyway.
// match p.block_request.take() {
// Some((_, request)) if request.id == response.id => request,
// Some(_) => {
// trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
// return CustomMessageOutcome::None;
// }
// None => {
// trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
// self.behaviour.disconnect_peer(&peer);
// self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
// return CustomMessageOutcome::None;
// }
// }
// } else {
// trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
// self.behaviour.disconnect_peer(&peer);
// self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
// return CustomMessageOutcome::None;
// };

let blocks_range = || match (
response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
block_response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target: "sync", "BlockResponse {} from {} with {} blocks {}",
response.id,
peer,
response.blocks.len(),
block_response.id,
peer_id,
block_response.blocks.len(),
blocks_range(),
);

if request.fields == message::BlockAttributes::JUSTIFICATION {
match self.sync.on_block_justification(peer, response) {
if block_request.fields == message::BlockAttributes::JUSTIFICATION {
match self.sync.on_block_justification(peer_id, block_response) {
Ok(sync::OnBlockJustification::Nothing) => CustomMessageOutcome::None,
Ok(sync::OnBlockJustification::Import { peer, hash, number, justification }) =>
CustomMessageOutcome::JustificationImport(peer, hash, number, justification),
Expand All @@ -758,26 +827,26 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
} else {
// Validate fields against the request.
if request.fields.contains(message::BlockAttributes::HEADER) && response.blocks.iter().any(|b| b.header.is_none()) {
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE);
if block_request.fields.contains(message::BlockAttributes::HEADER) && block_response.blocks.iter().any(|b| b.header.is_none()) {
self.behaviour.disconnect_peer(&peer_id);
self.peerset_handle.report_peer(peer_id, rep::BAD_RESPONSE);
trace!(target: "sync", "Missing header for a block");
return CustomMessageOutcome::None
}
if request.fields.contains(message::BlockAttributes::BODY) && response.blocks.iter().any(|b| b.body.is_none()) {
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE);
if block_request.fields.contains(message::BlockAttributes::BODY) && block_response.blocks.iter().any(|b| b.body.is_none()) {
self.behaviour.disconnect_peer(&peer_id);
self.peerset_handle.report_peer(peer_id, rep::BAD_RESPONSE);
trace!(target: "sync", "Missing body for a block");
return CustomMessageOutcome::None
}

match self.sync.on_block_data(&peer, Some(request), response) {
match self.sync.on_block_data(&peer_id, Some(block_request), block_response) {
Ok(sync::OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(sync::OnBlockData::Request(peer, mut req)) => {
self.update_peer_request(&peer, &mut req);
self.update_peer_request(&peer_id, &mut req);
CustomMessageOutcome::BlockRequest {
target: peer,
target: peer_id,
request: req,
}
}
Expand Down Expand Up @@ -812,7 +881,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
let mut aborting = Vec::new();
{
for (who, peer) in self.context_data.peers.iter() {
if peer.block_request.as_ref().map_or(false, |(t, _)| (tick - *t).as_secs() > REQUEST_TIMEOUT_SEC) {
if peer.block_request.as_ref().map_or(false, |(t, _, _request_id)| (tick - *t).as_secs() > REQUEST_TIMEOUT_SEC) {
log!(
target: "sync",
if self.important_peers.contains(who) { Level::Warn } else { Level::Trace },
Expand Down Expand Up @@ -1395,6 +1464,22 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
self.sync.on_finality_proof_request_started(who, block_hash, request_id)
}

pub fn on_block_request_started(
&mut self,
who: PeerId,
request_id: u64,
request_response_request_id: libp2p::request_response::RequestId,
) {
let peer = self.context_data.peers.get_mut(&who).unwrap();

if peer.block_request.as_mut().unwrap().1.id != request_id {
// TODO: Handle. likely fine. Some other request replaced the current one.
panic!();
}

peer.block_request.as_mut().unwrap().2 = Some(request_response_request_id);
}

fn format_stats(&self) -> String {
let mut out = String::new();
for (id, stats) in &self.context_data.stats {
Expand Down Expand Up @@ -1501,11 +1586,11 @@ fn update_peer_request<B: BlockT, H: ExHashT>(
if let Some(ref mut peer) = peers.get_mut(who) {
request.id = peer.next_request_id;
peer.next_request_id += 1;
if let Some((timestamp, request)) = peer.block_request.take() {
if let Some((timestamp, request, _request_id)) = peer.block_request.take() {
trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who);
peer.obsolete_requests.insert(request.id, timestamp);
}
peer.block_request = Some((Instant::now(), request.clone()));
peer.block_request = Some((Instant::now(), request.clone(), None));
}
}

Expand Down