Skip to content

Commit 8918d64

Browse files
altonenbkchr
authored andcommitted
Move import queue out of sc-network (paritytech#12764)
* Move import queue out of `sc-network` Add supplementary asynchronous API for the import queue which means it can be run as an independent task and communicated with through the `ImportQueueService`. This commit removes removes block and justification imports from `sc-network` and provides `ChainSync` with a handle to import queue so it can import blocks and justifications. Polling of the import queue is moved complete out of `sc-network` and `sc_consensus::Link` is implemented for `ChainSyncInterfaceHandled` so the import queue can still influence the syncing process. * Fix tests * Apply review comments * Apply suggestions from code review Co-authored-by: Bastian Köcher <[email protected]> * Update client/network/sync/src/lib.rs Co-authored-by: Bastian Köcher <[email protected]> Co-authored-by: Bastian Köcher <[email protected]>
1 parent 3f7175d commit 8918d64

File tree

24 files changed

+716
-490
lines changed

24 files changed

+716
-490
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/consensus/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ futures = { version = "0.3.21", features = ["thread-pool"] }
1818
futures-timer = "3.0.1"
1919
libp2p = { version = "0.49.0", default-features = false }
2020
log = "0.4.17"
21+
mockall = "0.11.2"
2122
parking_lot = "0.12.1"
2223
serde = { version = "1.0", features = ["derive"] }
2324
thiserror = "1.0.30"

client/consensus/common/src/import_queue.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub type DefaultImportQueue<Block, Client> =
5353

5454
mod basic_queue;
5555
pub mod buffered_link;
56+
pub mod mock;
5657

5758
/// Shared block import struct used by the queue.
5859
pub type BoxBlockImport<B, Transaction> =
@@ -105,10 +106,10 @@ pub trait Verifier<B: BlockT>: Send + Sync {
105106
/// Blocks import queue API.
106107
///
107108
/// The `import_*` methods can be called in order to send elements for the import queue to verify.
108-
/// Afterwards, call `poll_actions` to determine how to respond to these elements.
109-
pub trait ImportQueue<B: BlockT>: Send {
109+
pub trait ImportQueueService<B: BlockT>: Send {
110110
/// Import bunch of blocks.
111111
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
112+
112113
/// Import block justifications.
113114
fn import_justifications(
114115
&mut self,
@@ -117,12 +118,26 @@ pub trait ImportQueue<B: BlockT>: Send {
117118
number: NumberFor<B>,
118119
justifications: Justifications,
119120
);
120-
/// Polls for actions to perform on the network.
121-
///
121+
}
122+
123+
#[async_trait::async_trait]
124+
pub trait ImportQueue<B: BlockT>: Send {
125+
/// Get a copy of the handle to [`ImportQueueService`].
126+
fn service(&self) -> Box<dyn ImportQueueService<B>>;
127+
128+
/// Get a reference to the handle to [`ImportQueueService`].
129+
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;
130+
122131
/// This method should behave in a way similar to `Future::poll`. It can register the current
123132
/// task and notify later when more actions are ready to be polled. To continue the comparison,
124133
/// it is as if this method always returned `Poll::Pending`.
125134
fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &mut dyn Link<B>);
135+
136+
/// Start asynchronous runner for import queue.
137+
///
138+
/// Takes an object implementing [`Link`] which allows the import queue to
139+
/// influece the synchronization process.
140+
async fn run(self, link: Box<dyn Link<B>>);
126141
}
127142

128143
/// Hooks that the verification queue can use to influence the synchronization

client/consensus/common/src/import_queue/basic_queue.rs

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,17 @@ use crate::{
3434
import_queue::{
3535
buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender},
3636
import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport,
37-
BoxJustificationImport, ImportQueue, IncomingBlock, Link, RuntimeOrigin, Verifier,
37+
BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link,
38+
RuntimeOrigin, Verifier,
3839
},
3940
metrics::Metrics,
4041
};
4142

4243
/// Interface to a basic block import queue that is importing blocks sequentially in a separate
4344
/// task, with plugable verification.
4445
pub struct BasicQueue<B: BlockT, Transaction> {
45-
/// Channel to send justification import messages to the background task.
46-
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
47-
/// Channel to send block import messages to the background task.
48-
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
46+
/// Handle for sending justification and block import messages to the background task.
47+
handle: BasicQueueHandle<B>,
4948
/// Results coming from the worker task.
5049
result_port: BufferedLinkReceiver<B>,
5150
_phantom: PhantomData<Transaction>,
@@ -54,8 +53,7 @@ pub struct BasicQueue<B: BlockT, Transaction> {
5453
impl<B: BlockT, Transaction> Drop for BasicQueue<B, Transaction> {
5554
fn drop(&mut self) {
5655
// Flush the queue and close the receiver to terminate the future.
57-
self.justification_sender.close_channel();
58-
self.block_import_sender.close_channel();
56+
self.handle.close();
5957
self.result_port.close();
6058
}
6159
}
@@ -95,11 +93,37 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
9593
future.boxed(),
9694
);
9795

98-
Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData }
96+
Self {
97+
handle: BasicQueueHandle::new(justification_sender, block_import_sender),
98+
result_port,
99+
_phantom: PhantomData,
100+
}
99101
}
100102
}
101103

102-
impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction> {
104+
#[derive(Clone)]
105+
struct BasicQueueHandle<B: BlockT> {
106+
/// Channel to send justification import messages to the background task.
107+
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
108+
/// Channel to send block import messages to the background task.
109+
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
110+
}
111+
112+
impl<B: BlockT> BasicQueueHandle<B> {
113+
pub fn new(
114+
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
115+
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
116+
) -> Self {
117+
Self { justification_sender, block_import_sender }
118+
}
119+
120+
pub fn close(&mut self) {
121+
self.justification_sender.close_channel();
122+
self.block_import_sender.close_channel();
123+
}
124+
}
125+
126+
impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
103127
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
104128
if blocks.is_empty() {
105129
return
@@ -138,12 +162,39 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
138162
}
139163
}
140164
}
165+
}
166+
167+
#[async_trait::async_trait]
168+
impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction> {
169+
/// Get handle to [`ImportQueueService`].
170+
fn service(&self) -> Box<dyn ImportQueueService<B>> {
171+
Box::new(self.handle.clone())
172+
}
141173

174+
/// Get a reference to the handle to [`ImportQueueService`].
175+
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B> {
176+
&mut self.handle
177+
}
178+
179+
/// Poll actions from network.
142180
fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
143181
if self.result_port.poll_actions(cx, link).is_err() {
144182
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
145183
}
146184
}
185+
186+
/// Start asynchronous runner for import queue.
187+
///
188+
/// Takes an object implementing [`Link`] which allows the import queue to
189+
/// influece the synchronization process.
190+
async fn run(mut self, mut link: Box<dyn Link<B>>) {
191+
loop {
192+
if let Err(_) = self.result_port.next_action(&mut *link).await {
193+
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
194+
return
195+
}
196+
}
197+
}
147198
}
148199

149200
/// Messages destinated to the background worker.

client/consensus/common/src/import_queue/buffered_link.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl<B: BlockT> Clone for BufferedLinkSender<B> {
8080
}
8181

8282
/// Internal buffered message.
83-
enum BlockImportWorkerMsg<B: BlockT> {
83+
pub enum BlockImportWorkerMsg<B: BlockT> {
8484
BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
8585
JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, bool),
8686
RequestJustification(B::Hash, NumberFor<B>),
@@ -122,6 +122,18 @@ pub struct BufferedLinkReceiver<B: BlockT> {
122122
}
123123

124124
impl<B: BlockT> BufferedLinkReceiver<B> {
125+
/// Send action for the synchronization to perform.
126+
pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &mut dyn Link<B>) {
127+
match msg {
128+
BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
129+
link.blocks_processed(imported, count, results),
130+
BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
131+
link.justification_imported(who, &hash, number, success),
132+
BlockImportWorkerMsg::RequestJustification(hash, number) =>
133+
link.request_justification(&hash, number),
134+
}
135+
}
136+
125137
/// Polls for the buffered link actions. Any enqueued action will be propagated to the link
126138
/// passed as parameter.
127139
///
@@ -138,15 +150,17 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
138150
Poll::Pending => break Ok(()),
139151
};
140152

141-
match msg {
142-
BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
143-
link.blocks_processed(imported, count, results),
144-
BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
145-
link.justification_imported(who, &hash, number, success),
146-
BlockImportWorkerMsg::RequestJustification(hash, number) =>
147-
link.request_justification(&hash, number),
148-
}
153+
self.send_actions(msg, &mut *link);
154+
}
155+
}
156+
157+
/// Poll next element from import queue and send the corresponding action command over the link.
158+
pub async fn next_action(&mut self, link: &mut dyn Link<B>) -> Result<(), ()> {
159+
if let Some(msg) = self.rx.next().await {
160+
self.send_actions(msg, link);
161+
return Ok(())
149162
}
163+
Err(())
150164
}
151165

152166
/// Close the channel.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// This file is part of Substrate.
2+
3+
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
4+
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5+
6+
// This program is free software: you can redistribute it and/or modify
7+
// it under the terms of the GNU General Public License as published by
8+
// the Free Software Foundation, either version 3 of the License, or
9+
// (at your option) any later version.
10+
11+
// This program is distributed in the hope that it will be useful,
12+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
// GNU General Public License for more details.
15+
16+
// You should have received a copy of the GNU General Public License
17+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
18+
19+
use super::*;
20+
21+
mockall::mock! {
22+
pub ImportQueueHandle<B: BlockT> {}
23+
24+
impl<B: BlockT> ImportQueueService<B> for ImportQueueHandle<B> {
25+
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
26+
fn import_justifications(
27+
&mut self,
28+
who: RuntimeOrigin,
29+
hash: B::Hash,
30+
number: NumberFor<B>,
31+
justifications: Justifications,
32+
);
33+
}
34+
}
35+
36+
mockall::mock! {
37+
pub ImportQueue<B: BlockT> {}
38+
39+
#[async_trait::async_trait]
40+
impl<B: BlockT> ImportQueue<B> for ImportQueue<B> {
41+
fn service(&self) -> Box<dyn ImportQueueService<B>>;
42+
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;
43+
fn poll_actions<'a>(&mut self, cx: &mut futures::task::Context<'a>, link: &mut dyn Link<B>);
44+
async fn run(self, link: Box<dyn Link<B>>);
45+
}
46+
}

client/network/common/src/sync.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ pub mod warp;
2424

2525
use libp2p::PeerId;
2626
use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse};
27-
use sc_consensus::{
28-
import_queue::RuntimeOrigin, BlockImportError, BlockImportStatus, IncomingBlock,
29-
};
27+
use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock};
3028
use sp_consensus::BlockOrigin;
3129
use sp_runtime::{
3230
traits::{Block as BlockT, NumberFor},
@@ -317,6 +315,12 @@ pub trait ChainSync<Block: BlockT>: Send {
317315
response: BlockResponse<Block>,
318316
) -> Result<OnBlockData<Block>, BadPeer>;
319317

318+
/// Procss received block data.
319+
fn process_block_response_data(
320+
&mut self,
321+
blocks_to_import: Result<OnBlockData<Block>, BadPeer>,
322+
);
323+
320324
/// Handle a response from the remote to a justification request that we made.
321325
///
322326
/// `request` must be the original request that triggered `response`.
@@ -326,17 +330,6 @@ pub trait ChainSync<Block: BlockT>: Send {
326330
response: BlockResponse<Block>,
327331
) -> Result<OnBlockJustification<Block>, BadPeer>;
328332

329-
/// A batch of blocks have been processed, with or without errors.
330-
///
331-
/// Call this when a batch of blocks have been processed by the import
332-
/// queue, with or without errors.
333-
fn on_blocks_processed(
334-
&mut self,
335-
imported: usize,
336-
count: usize,
337-
results: Vec<(Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>, Block::Hash)>,
338-
) -> Box<dyn Iterator<Item = Result<(PeerId, BlockRequest<Block>), BadPeer>>>;
339-
340333
/// Call this when a justification has been processed by the import queue,
341334
/// with or without errors.
342335
fn on_justification_import(
@@ -378,7 +371,7 @@ pub trait ChainSync<Block: BlockT>: Send {
378371
/// Call when a peer has disconnected.
379372
/// Canceled obsolete block request may result in some blocks being ready for
380373
/// import, so this functions checks for such blocks and returns them.
381-
fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<Block>>;
374+
fn peer_disconnected(&mut self, who: &PeerId);
382375

383376
/// Return some key metrics.
384377
fn metrics(&self) -> Metrics;
@@ -395,7 +388,10 @@ pub trait ChainSync<Block: BlockT>: Send {
395388
/// Internally calls [`ChainSync::poll_block_announce_validation()`] and
396389
/// this function should be polled until it returns [`Poll::Pending`] to
397390
/// consume all pending events.
398-
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<PollResult<Block>>;
391+
fn poll(
392+
&mut self,
393+
cx: &mut std::task::Context,
394+
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
399395

400396
/// Send block request to peer
401397
fn send_block_request(&mut self, who: PeerId, request: BlockRequest<Block>);

0 commit comments

Comments
 (0)