Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 73f19b0

Browse files
andresilvagavofyork
authored andcommitted
Improve ondemand request dispatch (#1349)
* core: fix bug on ondemand dispatch after a request was dispatched to the last peer the dispatch would loop forever on subsequent requests that aren't able to be fulfilled by any of the peers, since the last peer wasn't updated. * core: try to dispatch all pending ondemand requests
1 parent 6fc50a3 commit 73f19b0

File tree

1 file changed

+70
-5
lines changed

1 file changed

+70
-5
lines changed

core/network/src/on_demand.rs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -413,16 +413,24 @@ impl<B, E> OnDemandCore<B, E> where
413413
None => return,
414414
};
415415

416-
let last_peer = self.idle_peers.back().cloned();
417-
while !self.pending_requests.is_empty() {
416+
let mut last_peer = self.idle_peers.back().cloned();
417+
let mut unhandled_requests = VecDeque::new();
418+
419+
loop {
418420
let peer = match self.idle_peers.pop_front() {
419421
Some(peer) => peer,
420-
None => return,
422+
None => break,
421423
};
422424

423425
// check if request can (optimistically) be processed by the peer
424426
let can_be_processed_by_peer = {
425-
let request = self.pending_requests.front().expect("checked in loop condition; qed");
427+
let request = match self.pending_requests.front() {
428+
Some(r) => r,
429+
None => {
430+
self.idle_peers.push_front(peer);
431+
break;
432+
},
433+
};
426434
let peer_best_block = self.best_blocks.get(&peer)
427435
.expect("entries are inserted into best_blocks when peer is connected;
428436
entries are removed from best_blocks when peer is disconnected;
@@ -436,19 +444,25 @@ impl<B, E> OnDemandCore<B, E> where
436444

437445
// we have enumerated all peers and noone can handle request
438446
if Some(peer) == last_peer {
439-
break;
447+
let request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
448+
unhandled_requests.push_back(request);
449+
last_peer = self.idle_peers.back().cloned();
440450
}
441451

442452
continue;
443453
}
444454

455+
last_peer = self.idle_peers.back().cloned();
456+
445457
let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
446458
request.timestamp = Instant::now();
447459
trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer);
448460

449461
service.execute_in_context(|ctx| ctx.send_message(peer, request.message()));
450462
self.active_peers.insert(peer, request);
451463
}
464+
465+
self.pending_requests.append(&mut unhandled_requests);
452466
}
453467
}
454468

@@ -928,4 +942,55 @@ pub mod tests {
928942
assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true));
929943
assert_eq!(on_demand.core.lock().pending_requests.len(), 0);
930944
}
945+
946+
#[test]
947+
fn does_not_loop_forever_after_dispatching_request_to_last_peer() {
948+
// this test is a regression for a bug where the dispatch function would
949+
// loop forever after dispatching a request to the last peer, since the
950+
// last peer was not updated
951+
let (_x, on_demand) = dummy(true);
952+
let queue = RwLock::new(VecDeque::new());
953+
let mut network = TestIo::new(&queue, None);
954+
955+
on_demand.remote_header(RemoteHeaderRequest {
956+
cht_root: Default::default(),
957+
block: 250,
958+
retry_count: None,
959+
});
960+
on_demand.remote_header(RemoteHeaderRequest {
961+
cht_root: Default::default(),
962+
block: 250,
963+
retry_count: None,
964+
});
965+
966+
on_demand.on_connect(1, Roles::FULL, 200);
967+
on_demand.on_connect(2, Roles::FULL, 200);
968+
on_demand.on_connect(3, Roles::FULL, 250);
969+
970+
assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
971+
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
972+
}
973+
974+
#[test]
975+
fn tries_to_send_all_pending_requests() {
976+
let (_x, on_demand) = dummy(true);
977+
let queue = RwLock::new(VecDeque::new());
978+
let mut network = TestIo::new(&queue, None);
979+
980+
on_demand.remote_header(RemoteHeaderRequest {
981+
cht_root: Default::default(),
982+
block: 300,
983+
retry_count: None,
984+
});
985+
on_demand.remote_header(RemoteHeaderRequest {
986+
cht_root: Default::default(),
987+
block: 250,
988+
retry_count: None,
989+
});
990+
991+
on_demand.on_connect(1, Roles::FULL, 250);
992+
993+
assert!(on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>().is_empty());
994+
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
995+
}
931996
}

0 commit comments

Comments
 (0)