Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d0540a7
Introduce sensible weight constants (#12868)
KiChjang Dec 8, 2022
02a9dea
Checkout to the branch HEAD explicitly in `build-linux-substrate` (#1…
rcny Dec 8, 2022
9a0644c
cli: Improve pruning documentation (#12819)
lexnv Dec 8, 2022
e6bbc53
Revert "Move LockableCurrency trait to fungibles::Lockable and deprec…
tonyalaribe Dec 9, 2022
90ab4fa
Don't indefinitely block on shutting down Tokio (#12885)
bkchr Dec 9, 2022
47bd959
General Message Queue Pallet (#12485)
gavofyork Dec 9, 2022
f0b6e79
zombienet timings adjusted (#12890)
michalkucharczyk Dec 9, 2022
9931220
Move import queue out of `sc-network` (#12764)
altonen Dec 9, 2022
15cfd9c
Trace response payload in default `jsonrpsee` middleware (#12886)
tgmichel Dec 9, 2022
33e6029
Ensure that we inform all tasks to stop before starting the 60 second…
bkchr Dec 10, 2022
2f0d59d
Safe desired targets call (#12826)
Ank4n Dec 10, 2022
0ba5206
Fix typo (#12900)
ltfschoen Dec 11, 2022
9772209
ValidateUnsigned: Improve docs. (#12870)
bkchr Dec 11, 2022
06090ab
rpc server with HTTP/WS on the same socket (#12663)
niklasad1 Dec 12, 2022
e5d5d88
`pallet-message-queue`: Fix license (#12895)
ggwpez Dec 12, 2022
b3d9f3c
Use explicit call indices (#12891)
ggwpez Dec 12, 2022
f3c95e6
Pin canonincalized block (#12902)
arkpar Dec 12, 2022
d4837cb
Remove implicit approval chilling upon slash. (#12420)
kianenigma Dec 12, 2022
2a0eeff
bounties calls docs fix (#12909)
muharem Dec 12, 2022
01efa85
pallet-contracts migration pre-upgrade fix for v8 (#12905)
Dinonard Dec 13, 2022
13664c3
use custom environment for publishing crates (#12912)
joao-paulo-parity Dec 13, 2022
93fa104
[contracts] Add debug buffer limit + enforcement (#12845)
agryaznov Dec 13, 2022
c4fbb12
Merge remote-tracking branch 'origin/master' into HEAD
lexnv Dec 13, 2022
dfd9af0
frame/remote-externalities: Fix clippy
lexnv Dec 13, 2022
1f40715
frame/rpc: Add previous export
lexnv Dec 13, 2022
89498c0
Fixup some wrong dependencies (#12899)
bkchr Dec 13, 2022
b65c9f0
add numerator and denominator to Rational128 Debug impl and increase …
apopiak Dec 14, 2022
59b5903
Fix state-db pinning (#12927)
arkpar Dec 14, 2022
2e21c35
[ci] add job switcher (#12922)
alvicsam Dec 14, 2022
4de625c
Merge remote-tracking branch 'origin/master' into lexnv/kiz-revamp-tr…
lexnv Dec 14, 2022
2f6105b
Use LOG_TARGET in consensus related crates (#12875)
davxy Dec 14, 2022
5efd759
Merge branch 'master' of github.com:paritytech/substrate into lexnv/k…
kianenigma Dec 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move import queue out of sc-network (#12764)
* Move import queue out of `sc-network`

Add supplementary asynchronous API for the import queue which means
it can be run as an independent task and communicated with through
the `ImportQueueService`.

This commit removes removes block and justification imports from
`sc-network` and provides `ChainSync` with a handle to import queue so
it can import blocks and justifications. Polling of the import queue is
moved complete out of `sc-network` and `sc_consensus::Link` is
implemented for `ChainSyncInterfaceHandled` so the import queue
can still influence the syncing process.

* Fix tests

* Apply review comments

* Apply suggestions from code review

Co-authored-by: Bastian KΓΆcher <[email protected]>

* Update client/network/sync/src/lib.rs

Co-authored-by: Bastian KΓΆcher <[email protected]>

Co-authored-by: Bastian KΓΆcher <[email protected]>
  • Loading branch information
altonen and bkchr authored Dec 9, 2022
commit 9931220910f9fb65227fe4571842f800d61c7b95
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ futures = { version = "0.3.21", features = ["thread-pool"] }
futures-timer = "3.0.1"
libp2p = { version = "0.49.0", default-features = false }
log = "0.4.17"
mockall = "0.11.2"
parking_lot = "0.12.1"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0.30"
Expand Down
23 changes: 19 additions & 4 deletions client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub type DefaultImportQueue<Block, Client> =

mod basic_queue;
pub mod buffered_link;
pub mod mock;

/// Shared block import struct used by the queue.
pub type BoxBlockImport<B, Transaction> =
Expand Down Expand Up @@ -105,10 +106,10 @@ pub trait Verifier<B: BlockT>: Send + Sync {
/// Blocks import queue API.
///
/// The `import_*` methods can be called in order to send elements for the import queue to verify.
/// Afterwards, call `poll_actions` to determine how to respond to these elements.
pub trait ImportQueue<B: BlockT>: Send {
pub trait ImportQueueService<B: BlockT>: Send {
/// Import bunch of blocks.
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);

/// Import block justifications.
fn import_justifications(
&mut self,
Expand All @@ -117,12 +118,26 @@ pub trait ImportQueue<B: BlockT>: Send {
number: NumberFor<B>,
justifications: Justifications,
);
/// Polls for actions to perform on the network.
///
}

#[async_trait::async_trait]
pub trait ImportQueue<B: BlockT>: Send {
/// Get a copy of the handle to [`ImportQueueService`].
fn service(&self) -> Box<dyn ImportQueueService<B>>;

/// Get a reference to the handle to [`ImportQueueService`].
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;

/// This method should behave in a way similar to `Future::poll`. It can register the current
/// task and notify later when more actions are ready to be polled. To continue the comparison,
/// it is as if this method always returned `Poll::Pending`.
fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &mut dyn Link<B>);

/// Start asynchronous runner for import queue.
///
/// Takes an object implementing [`Link`] which allows the import queue to
/// influece the synchronization process.
async fn run(self, link: Box<dyn Link<B>>);
}

/// Hooks that the verification queue can use to influence the synchronization
Expand Down
69 changes: 60 additions & 9 deletions client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,17 @@ use crate::{
import_queue::{
buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender},
import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport,
BoxJustificationImport, ImportQueue, IncomingBlock, Link, RuntimeOrigin, Verifier,
BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link,
RuntimeOrigin, Verifier,
},
metrics::Metrics,
};

/// Interface to a basic block import queue that is importing blocks sequentially in a separate
/// task, with plugable verification.
pub struct BasicQueue<B: BlockT, Transaction> {
/// Channel to send justification import messages to the background task.
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
/// Channel to send block import messages to the background task.
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
/// Handle for sending justification and block import messages to the background task.
handle: BasicQueueHandle<B>,
/// Results coming from the worker task.
result_port: BufferedLinkReceiver<B>,
_phantom: PhantomData<Transaction>,
Expand All @@ -54,8 +53,7 @@ pub struct BasicQueue<B: BlockT, Transaction> {
impl<B: BlockT, Transaction> Drop for BasicQueue<B, Transaction> {
fn drop(&mut self) {
// Flush the queue and close the receiver to terminate the future.
self.justification_sender.close_channel();
self.block_import_sender.close_channel();
self.handle.close();
self.result_port.close();
}
}
Expand Down Expand Up @@ -95,11 +93,37 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
future.boxed(),
);

Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData }
Self {
handle: BasicQueueHandle::new(justification_sender, block_import_sender),
result_port,
_phantom: PhantomData,
}
}
}

impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction> {
#[derive(Clone)]
struct BasicQueueHandle<B: BlockT> {
/// Channel to send justification import messages to the background task.
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
/// Channel to send block import messages to the background task.
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
}

impl<B: BlockT> BasicQueueHandle<B> {
pub fn new(
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
) -> Self {
Self { justification_sender, block_import_sender }
}

pub fn close(&mut self) {
self.justification_sender.close_channel();
self.block_import_sender.close_channel();
}
}

impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
if blocks.is_empty() {
return
Expand Down Expand Up @@ -138,12 +162,39 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
}
}
}
}

#[async_trait::async_trait]
impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction> {
/// Get handle to [`ImportQueueService`].
fn service(&self) -> Box<dyn ImportQueueService<B>> {
Box::new(self.handle.clone())
}

/// Get a reference to the handle to [`ImportQueueService`].
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B> {
&mut self.handle
}

/// Poll actions from network.
fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
if self.result_port.poll_actions(cx, link).is_err() {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
}
}

/// Start asynchronous runner for import queue.
///
/// Takes an object implementing [`Link`] which allows the import queue to
/// influece the synchronization process.
async fn run(mut self, mut link: Box<dyn Link<B>>) {
loop {
if let Err(_) = self.result_port.next_action(&mut *link).await {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
return
}
}
}
}

/// Messages destinated to the background worker.
Expand Down
32 changes: 23 additions & 9 deletions client/consensus/common/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<B: BlockT> Clone for BufferedLinkSender<B> {
}

/// Internal buffered message.
enum BlockImportWorkerMsg<B: BlockT> {
pub enum BlockImportWorkerMsg<B: BlockT> {
BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, bool),
RequestJustification(B::Hash, NumberFor<B>),
Expand Down Expand Up @@ -122,6 +122,18 @@ pub struct BufferedLinkReceiver<B: BlockT> {
}

impl<B: BlockT> BufferedLinkReceiver<B> {
/// Send action for the synchronization to perform.
pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &mut dyn Link<B>) {
match msg {
BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
link.blocks_processed(imported, count, results),
BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
link.justification_imported(who, &hash, number, success),
BlockImportWorkerMsg::RequestJustification(hash, number) =>
link.request_justification(&hash, number),
}
}

/// Polls for the buffered link actions. Any enqueued action will be propagated to the link
/// passed as parameter.
///
Expand All @@ -138,15 +150,17 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
Poll::Pending => break Ok(()),
};

match msg {
BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
link.blocks_processed(imported, count, results),
BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
link.justification_imported(who, &hash, number, success),
BlockImportWorkerMsg::RequestJustification(hash, number) =>
link.request_justification(&hash, number),
}
self.send_actions(msg, &mut *link);
}
}

/// Poll next element from import queue and send the corresponding action command over the link.
pub async fn next_action(&mut self, link: &mut dyn Link<B>) -> Result<(), ()> {
if let Some(msg) = self.rx.next().await {
self.send_actions(msg, link);
return Ok(())
}
Err(())
}

/// Close the channel.
Expand Down
46 changes: 46 additions & 0 deletions client/consensus/common/src/import_queue/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// This file is part of Substrate.

// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use super::*;

mockall::mock! {
pub ImportQueueHandle<B: BlockT> {}

impl<B: BlockT> ImportQueueService<B> for ImportQueueHandle<B> {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
fn import_justifications(
&mut self,
who: RuntimeOrigin,
hash: B::Hash,
number: NumberFor<B>,
justifications: Justifications,
);
}
}

mockall::mock! {
pub ImportQueue<B: BlockT> {}

#[async_trait::async_trait]
impl<B: BlockT> ImportQueue<B> for ImportQueue<B> {
fn service(&self) -> Box<dyn ImportQueueService<B>>;
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;
fn poll_actions<'a>(&mut self, cx: &mut futures::task::Context<'a>, link: &mut dyn Link<B>);
async fn run(self, link: Box<dyn Link<B>>);
}
}
28 changes: 12 additions & 16 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ pub mod warp;

use libp2p::PeerId;
use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse};
use sc_consensus::{
import_queue::RuntimeOrigin, BlockImportError, BlockImportStatus, IncomingBlock,
};
use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock};
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Expand Down Expand Up @@ -317,6 +315,12 @@ pub trait ChainSync<Block: BlockT>: Send {
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;

/// Procss received block data.
fn process_block_response_data(
&mut self,
blocks_to_import: Result<OnBlockData<Block>, BadPeer>,
);

/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
Expand All @@ -326,17 +330,6 @@ pub trait ChainSync<Block: BlockT>: Send {
response: BlockResponse<Block>,
) -> Result<OnBlockJustification<Block>, BadPeer>;

/// A batch of blocks have been processed, with or without errors.
///
/// Call this when a batch of blocks have been processed by the import
/// queue, with or without errors.
fn on_blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>, Block::Hash)>,
) -> Box<dyn Iterator<Item = Result<(PeerId, BlockRequest<Block>), BadPeer>>>;

/// Call this when a justification has been processed by the import queue,
/// with or without errors.
fn on_justification_import(
Expand Down Expand Up @@ -378,7 +371,7 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Call when a peer has disconnected.
/// Canceled obsolete block request may result in some blocks being ready for
/// import, so this functions checks for such blocks and returns them.
fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<Block>>;
fn peer_disconnected(&mut self, who: &PeerId);

/// Return some key metrics.
fn metrics(&self) -> Metrics;
Expand All @@ -395,7 +388,10 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Internally calls [`ChainSync::poll_block_announce_validation()`] and
/// this function should be polled until it returns [`Poll::Pending`] to
/// consume all pending events.
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<PollResult<Block>>;
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;

/// Send block request to peer
fn send_block_request(&mut self, who: PeerId, request: BlockRequest<Block>);
Expand Down
Loading