Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
7a76b40
Move import queue out of `sc-network`
altonen Nov 21, 2022
1d1b955
Move stuff to SyncingEngine
altonen Nov 22, 2022
bd8f6a2
Move `ChainSync` instanation to `SyncingEngine`
altonen Nov 22, 2022
9569593
Move peer hashmap to `SyncingEngine`
altonen Nov 22, 2022
befeac3
Let `SyncingEngine` to implement `ChainSyncInterface`
altonen Nov 23, 2022
c642a33
Introduce `SyncStatusProvider`
altonen Nov 23, 2022
badfbf3
Move `sync_peer_(connected|disconnected)` to `SyncingEngine`
altonen Nov 23, 2022
755b47c
Implement `SyncEventStream`
altonen Nov 25, 2022
0b11339
Introduce `ChainSyncInterface`
altonen Nov 25, 2022
6f4ac98
Move event stream polling to `SyncingEngine`
altonen Nov 26, 2022
e2ea277
Make `SyncingEngine` into an asynchronous runner
altonen Nov 26, 2022
caf54b4
Fix warnings
altonen Nov 27, 2022
a920a7e
Code refactoring
altonen Nov 27, 2022
5dd14e3
Use `SyncingService` for BEEFY
altonen Nov 28, 2022
83509c9
Use `SyncingService` for GRANDPA
altonen Nov 28, 2022
bb005b6
Remove call delegation from `NetworkService`
altonen Nov 28, 2022
e5d6c49
Remove `ChainSyncService`
altonen Nov 29, 2022
a4f5403
Remove `ChainSync` service tests
altonen Nov 29, 2022
f485d89
Merge remote-tracking branch 'origin/master' into import-queue-refact…
altonen Dec 1, 2022
2acb775
Refactor code
altonen Nov 29, 2022
b01586a
Merge branch 'import-queue-refactoring' into extract-syncing-from-sc-…
altonen Dec 1, 2022
0085220
Refactor code
altonen Dec 2, 2022
da14feb
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Dec 12, 2022
ee39f7b
Merge remote-tracking branch 'origin/master' into extract-syncing
altonen Dec 30, 2022
a55a44d
Update client/finality-grandpa/src/communication/tests.rs
altonen Dec 30, 2022
9882a76
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 7, 2023
49240a2
Fix warnings
altonen Feb 8, 2023
e8c017d
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 17, 2023
17b6872
Apply review comments
altonen Feb 20, 2023
721cc49
Fix docs
altonen Feb 20, 2023
6851852
Fix test
altonen Feb 20, 2023
8eaa4cb
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 20, 2023
d84be72
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 21, 2023
4f7cf6e
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 22, 2023
e50fda8
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Feb 28, 2023
d8461c4
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 1, 2023
e105fe5
cargo-fmt
altonen Mar 2, 2023
2f0d7d2
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 2, 2023
1f8d399
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 2, 2023
d21fffc
Update client/network/sync/src/engine.rs
altonen Mar 5, 2023
198a695
Update client/network/sync/src/engine.rs
altonen Mar 5, 2023
7c4babc
Add missing docs
altonen Mar 5, 2023
d028177
Refactor code
altonen Mar 5, 2023
efa7716
Merge remote-tracking branch 'origin/extract-syncing-from-sc-network'…
altonen Mar 5, 2023
800b4ae
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 5, 2023
242ee09
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 6, 2023
4f295d4
Merge remote-tracking branch 'origin/master' into extract-syncing-fro…
altonen Mar 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement SyncEventStream
Remove `SyncConnected`/`SyncDisconnected` events from
`NetworkEvenStream` and provide those events through
`ChainSyncInterface` instead.

Modify BEEFY/GRANDPA/transactions protocol and `NetworkGossip` to take
`SyncEventStream` object which they listen to for incoming sync peer
events.
  • Loading branch information
altonen committed Nov 25, 2022
commit 755b47cf25ea04e0a48b17aee05b36dfa6cf54a9
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
backend,
system_rpc_tx,
tx_handler_controller,
sync_service,
sync_service: sync_service.clone(),
config,
telemetry: telemetry.as_mut(),
})?;
Expand Down Expand Up @@ -321,6 +321,8 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
config: grandpa_config,
link: grandpa_link,
network,
// TODo(aaro): fix arc
sync: Arc::new(sync_service),
voting_rule: sc_finality_grandpa::VotingRulesBuilder::default().build(),
prometheus_registry,
shared_voter_state: SharedVoterState::empty(),
Expand Down
5 changes: 3 additions & 2 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use node_primitives::Block;
use sc_client_api::BlockBackend;
use sc_consensus_babe::{self, SlotProportion};
use sc_executor::NativeElseWasmExecutor;
use sc_network::NetworkService;
use sc_network::{ChainSyncInterface, NetworkService};
use sc_network_common::{protocol::event::Event, service::NetworkEventStream};
use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager};
use sc_telemetry::{Telemetry, TelemetryWorker};
Expand Down Expand Up @@ -393,7 +393,7 @@ pub fn new_full_base(
task_manager: &mut task_manager,
system_rpc_tx,
tx_handler_controller,
sync_service,
sync_service: sync_service.clone(),
telemetry: telemetry.as_mut(),
})?;

Expand Down Expand Up @@ -533,6 +533,7 @@ pub fn new_full_base(
config,
link: grandpa_link,
network: network.clone(),
sync: Arc::new(sync_service),
telemetry: telemetry.as_ref().map(|x| x.handle()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
prometheus_registry,
Expand Down
15 changes: 12 additions & 3 deletions client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use prometheus::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
use sc_consensus::BlockImport;
use sc_network::ProtocolName;
use sc_network_common::service::NetworkRequest;
use sc_network_common::{service::NetworkRequest, sync::SyncEventStream};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use sp_api::{HeaderT, NumberFor, ProvideRuntimeApi};
use sp_blockchain::{
Expand Down Expand Up @@ -168,6 +168,8 @@ where
pub struct BeefyNetworkParams<B: Block, N> {
/// Network implementing gossip, requests and sync-oracle.
pub network: Arc<N>,
/// Syncing service implementing event stream for peers.
pub sync: Arc<dyn SyncEventStream>,
/// Chain specific BEEFY gossip protocol name. See
/// [`communication::beefy_protocol_name::gossip_protocol_name`].
pub gossip_protocol_name: ProtocolName,
Expand Down Expand Up @@ -228,14 +230,20 @@ where
on_demand_justifications_handler,
} = beefy_params;

let BeefyNetworkParams { network, gossip_protocol_name, justifications_protocol_name, .. } =
network_params;
let BeefyNetworkParams {
network,
sync,
gossip_protocol_name,
justifications_protocol_name,
..
} = network_params;

let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator =
Arc::new(communication::gossip::GossipValidator::new(known_peers.clone()));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name,
gossip_validator.clone(),
None,
Expand Down Expand Up @@ -285,6 +293,7 @@ where
backend,
payload_provider,
network,
sync,
key_store: key_store.into(),
known_peers,
gossip_engine,
Expand Down
6 changes: 6 additions & 0 deletions client/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl BeefyTestNet {
});
}

pub(crate) fn block_until_sync_connected(&mut self) {
todo!();
}

pub(crate) fn generate_blocks_and_sync(
&mut self,
count: usize,
Expand Down Expand Up @@ -342,6 +346,7 @@ where

let network_params = crate::BeefyNetworkParams {
network: peer.network_service().clone(),
sync: peer.sync_service().clone(),
gossip_protocol_name: beefy_gossip_proto_name(),
justifications_protocol_name: on_demand_justif_handler.protocol_name(),
_phantom: PhantomData,
Expand Down Expand Up @@ -954,6 +959,7 @@ fn test_voter_init_setup(
Arc::new(crate::communication::gossip::GossipValidator::new(known_peers));
let mut gossip_engine = sc_network_gossip::GossipEngine::new(
net.peer(0).network_service().clone(),
net.peer(0).sync_service().clone(),
"/beefy/whatever",
gossip_validator,
None,
Expand Down
37 changes: 24 additions & 13 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, Header
use sc_network_common::{
protocol::event::Event as NetEvent,
service::{NetworkEventStream, NetworkRequest},
sync::{SyncEvent, SyncEventStream},
};
use sc_network_gossip::GossipEngine;

Expand Down Expand Up @@ -252,6 +253,7 @@ pub(crate) struct WorkerParams<B: Block, BE, P, R, N> {
pub backend: Arc<BE>,
pub payload_provider: P,
pub network: N,
pub sync: Arc<dyn SyncEventStream>,
pub key_store: BeefyKeystore,
pub known_peers: Arc<Mutex<KnownPeers<B>>>,
pub gossip_engine: GossipEngine<B>,
Expand Down Expand Up @@ -302,6 +304,7 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, R, N> {
backend: Arc<BE>,
payload_provider: P,
network: N,
sync: Arc<dyn SyncEventStream>,
key_store: BeefyKeystore,

// communication
Expand Down Expand Up @@ -346,6 +349,7 @@ where
payload_provider,
key_store,
network,
sync,
gossip_engine,
gossip_validator,
on_demand_justifications,
Expand All @@ -359,6 +363,7 @@ where
backend,
payload_provider,
network,
sync,
known_peers,
key_store,
gossip_engine,
Expand Down Expand Up @@ -794,7 +799,7 @@ where
) {
info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block());

let mut network_events = self.network.event_stream("network-gossip").fuse();
let mut sync_events = self.sync.event_stream("network-gossipzzz").fuse();
let mut votes = Box::pin(
self.gossip_engine
.messages_for(topic::<B>())
Expand Down Expand Up @@ -838,11 +843,11 @@ where
return;
},
// Keep track of connected peers.
net_event = network_events.next() => {
if let Some(net_event) = net_event {
self.handle_network_event(net_event);
sync_event = sync_events.next() => {
if let Some(sync_event) = sync_event {
self.handle_sync_event(sync_event);
} else {
error!(target: "beefy", "🥩 Network events stream terminated, closing worker.");
error!(target: "beefy", "🥩 Syncing events stream terminated, closing worker.");
return;
}
},
Expand Down Expand Up @@ -897,16 +902,14 @@ where
}

/// Update known peers based on network events.
fn handle_network_event(&mut self, event: NetEvent) {
fn handle_sync_event(&mut self, event: SyncEvent) {
match event {
NetEvent::SyncConnected { remote } => {
SyncEvent::PeerConnected(remote) => {
self.known_peers.lock().add_new(remote);
},
NetEvent::SyncDisconnected { remote } => {
SyncEvent::PeerDisconnected(remote) => {
self.known_peers.lock().remove(&remote);
},
// We don't care about other events.
_ => (),
}
}
}
Expand Down Expand Up @@ -982,7 +985,8 @@ pub(crate) mod tests {
use beefy_primitives::{known_payloads, mmr::MmrRootProvider};
use futures::{executor::block_on, future::poll_fn, task::Poll};
use sc_client_api::{Backend as BackendT, HeaderBackend};
use sc_network::NetworkService;
use sc_network::{ChainSyncInterface, NetworkService};
use sc_network_common::sync::SyncEventStream;
use sc_network_test::TestNetFactory;
use sp_api::HeaderT;
use sp_blockchain::Backend as BlockchainBackendT;
Expand Down Expand Up @@ -1050,10 +1054,16 @@ pub(crate) mod tests {
let backend = peer.client().as_backend();
let api = Arc::new(TestApi {});
let network = peer.network_service().clone();
let sync = peer.sync_service().clone();
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
let gossip_validator = Arc::new(GossipValidator::new(known_peers.clone()));
let gossip_engine =
GossipEngine::new(network.clone(), "/beefy/1", gossip_validator.clone(), None);
let gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
"/beefy/1",
gossip_validator.clone(),
None,
);
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
api.clone(),
Expand All @@ -1080,6 +1090,7 @@ pub(crate) mod tests {
gossip_validator,
metrics: None,
network,
sync,
on_demand_justifications,
persisted_state,
};
Expand Down
7 changes: 6 additions & 1 deletion client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ use crate::{
use gossip::{
FullCatchUpMessage, FullCommitMessage, GossipMessage, GossipValidator, PeerReport, VoteMessage,
};
use sc_network_common::service::{NetworkBlock, NetworkSyncForkRequest};
use sc_network_common::{
service::{NetworkBlock, NetworkSyncForkRequest},
sync::SyncEventStream,
};
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_finality_grandpa::{AuthorityId, AuthoritySignature, RoundNumber, SetId as SetIdNumber};

Expand Down Expand Up @@ -234,6 +237,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
/// service taken from the VoterSetState.
pub(crate) fn new(
service: N,
sync: Arc<dyn SyncEventStream>,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
prometheus_registry: Option<&Registry>,
Expand All @@ -246,6 +250,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
let validator = Arc::new(validator);
let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
service.clone(),
sync.clone(),
protocol,
validator.clone(),
prometheus_registry,
Expand Down
24 changes: 23 additions & 1 deletion client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use sc_network_common::{
NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers,
NetworkSyncForkRequest, NotificationSender, NotificationSenderError,
},
sync::{SyncEvent as SyncStreamEvent, SyncEventStream},
};
use sc_network_gossip::Validator;
use sc_network_test::{Block, Hash};
Expand Down Expand Up @@ -190,6 +191,24 @@ impl sc_network_gossip::ValidatorContext<Block> for TestNetwork {
fn send_topic(&mut self, _: &PeerId, _: Hash, _: bool) {}
}

pub(crate) enum SyncEvent {
EventStream(TracingUnboundedSender<SyncStreamEvent>),
}

#[derive(Clone)]
pub(crate) struct TestSync {
sender: TracingUnboundedSender<SyncEvent>,
}

impl SyncEventStream for TestSync {
fn event_stream(
&self,
_name: &'static str,
) -> Pin<Box<dyn Stream<Item = SyncStreamEvent> + Send>> {
Box::pin(futures::stream::pending())
}
}

pub(crate) struct Tester {
pub(crate) net_handle: super::NetworkBridge<Block, TestNetwork>,
gossip_validator: Arc<GossipValidator<Block>>,
Expand Down Expand Up @@ -259,6 +278,8 @@ fn voter_set_state() -> SharedVoterSetState<Block> {
pub(crate) fn make_test_network() -> (impl Future<Output = Tester>, TestNetwork) {
let (tx, rx) = tracing_unbounded("test");
let net = TestNetwork { sender: tx };
let (stx, srx) = tracing_unbounded("sync");
let sync = Arc::new(TestSync { sender: stx });

#[derive(Clone)]
struct Exit;
Expand All @@ -271,7 +292,8 @@ pub(crate) fn make_test_network() -> (impl Future<Output = Tester>, TestNetwork)
}
}

let bridge = super::NetworkBridge::new(net.clone(), config(), voter_set_state(), None, None);
let bridge =
super::NetworkBridge::new(net.clone(), sync, config(), voter_set_state(), None, None);

(
futures::future::ready(Tester {
Expand Down
6 changes: 5 additions & 1 deletion client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use sc_client_api::{
StorageProvider, TransactionFor,
};
use sc_consensus::BlockImport;
use sc_network_common::protocol::ProtocolName;
use sc_network_common::{protocol::ProtocolName, sync::SyncEventStream};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -673,6 +673,8 @@ pub struct GrandpaParams<Block: BlockT, C, N, SC, VR> {
/// `sc_network` crate, it is assumed that the Grandpa notifications protocol has been passed
/// to the configuration of the networking. See [`grandpa_peers_set_config`].
pub network: N,
/// Event stream for syncing-related events.
pub sync: Arc<dyn SyncEventStream>,
/// A voting rule used to potentially restrict target votes.
pub voting_rule: VR,
/// The prometheus metrics registry.
Expand Down Expand Up @@ -724,6 +726,7 @@ where
mut config,
link,
network,
sync,
voting_rule,
prometheus_registry,
shared_voter_state,
Expand All @@ -748,6 +751,7 @@ where

let network = NetworkBridge::new(
network,
sync,
config.clone(),
persistent_data.set_state.clone(),
prometheus_registry.as_ref(),
Expand Down
3 changes: 3 additions & 0 deletions client/finality-grandpa/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use futures::prelude::*;
use log::{debug, info, warn};

use sc_client_api::backend::Backend;
use sc_network_common::sync::SyncEventStream;
use sc_telemetry::TelemetryHandle;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_blockchain::HeaderMetadata;
Expand Down Expand Up @@ -167,6 +168,7 @@ pub fn run_grandpa_observer<BE, Block: BlockT, Client, N, SC>(
config: Config,
link: LinkHalf<Block, Client, SC>,
network: N,
sync: Arc<dyn SyncEventStream>,
) -> sp_blockchain::Result<impl Future<Output = ()> + Send>
where
BE: Backend<Block> + Unpin + 'static,
Expand All @@ -186,6 +188,7 @@ where

let network = NetworkBridge::new(
network,
sync,
config.clone(),
persistent_data.set_state.clone(),
None,
Expand Down
Loading