Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f263943
Move NetworkBridgeEvent to subsystem::messages.
eskimor Jan 17, 2021
3bb1e37
Request/response infrastructure is taking shape.
eskimor Jan 26, 2021
0bbd8c1
Multiplexer variant not supported by Rusts type system.
eskimor Jan 27, 2021
b30ba0f
request_response::request type checks.
eskimor Jan 27, 2021
0d23861
Cleanup.
eskimor Jan 27, 2021
1de47ac
Minor fixes for request_response.
eskimor Jan 28, 2021
92f47f2
Implement request sending + move multiplexer.
eskimor Jan 28, 2021
4e53e79
Remove obsolete multiplexer.
eskimor Jan 29, 2021
857ab9c
Initialize bridge with multiplexer.
eskimor Jan 29, 2021
1204faf
Finish generic request sending/receiving.
eskimor Jan 29, 2021
3ae3171
Doc update.
eskimor Jan 29, 2021
06f9966
Fixes.
eskimor Jan 29, 2021
a6508ac
Link issue for not yet implemented code.
eskimor Jan 29, 2021
5a718dd
Fixes suggested by @ordian - thanks!
eskimor Jan 29, 2021
d73b567
Update node/network/protocol/src/request_response/v1.rs
eskimor Jan 29, 2021
d018426
Update node/network/protocol/src/request_response.rs
eskimor Jan 29, 2021
49d7814
Fix existing tests.
eskimor Jan 30, 2021
b441b60
Better avoidance of division by zoro errors.
eskimor Jan 30, 2021
25cffbd
Doc fixes.
eskimor Jan 30, 2021
8c442a4
send_request -> start_request.
eskimor Feb 2, 2021
bc61c99
Fix missing renamings.
eskimor Feb 2, 2021
f89b2d6
Update substrate.
eskimor Feb 3, 2021
f37a4d3
Pass TryConnect instead of true.
eskimor Feb 2, 2021
36388b9
Actually import `IfDisconnected`.
eskimor Feb 2, 2021
ad85ebe
Fix wrong import.
eskimor Feb 3, 2021
c776c73
Update node/network/bridge/src/lib.rs
eskimor Feb 3, 2021
77ad3e4
Update node/network/bridge/src/multiplexer.rs
eskimor Feb 3, 2021
3c4eb2f
Stop doing tracing from within `From` instance.
eskimor Feb 3, 2021
b24ff8d
Get rid of redundant import.
eskimor Feb 3, 2021
dca2681
Formatting cleanup.
eskimor Feb 3, 2021
10320ce
Fix tests.
eskimor Feb 3, 2021
e0c8545
Add link to issue.
eskimor Feb 3, 2021
bc76e28
Clarify comments some more.
eskimor Feb 3, 2021
553b1e8
Fix tests.
eskimor Feb 3, 2021
7600e96
Formatting fix.
eskimor Feb 3, 2021
468ce7e
tabs
Feb 3, 2021
32cff58
Fix link
eskimor Feb 3, 2021
9369937
Use map_err.
eskimor Feb 3, 2021
3877c07
Improvements inspired by suggestions by @drahnr.
eskimor Feb 3, 2021
f39e040
Merge branch 'rk-req-resp-2306' of github.com:paritytech/polkadot int…
eskimor Feb 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 141 additions & 139 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ use polkadot_node_primitives::{
use polkadot_node_subsystem::{
messages::{
AllMessages, ApprovalDistributionMessage, ApprovalVotingMessage, NetworkBridgeMessage,
AssignmentCheckResult, ApprovalCheckResult,
AssignmentCheckResult, ApprovalCheckResult, NetworkBridgeEvent,
},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_network_protocol::{
PeerId, View, NetworkBridgeEvent, v1 as protocol_v1, ReputationChange as Rep,
PeerId, View, v1 as protocol_v1, ReputationChange as Rep,
};

const LOG_TARGET: &str = "approval_distribution";
Expand Down
13 changes: 11 additions & 2 deletions node/network/availability-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use sp_keystore::{CryptoStore, SyncCryptoStorePtr};

use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View, OurView,
v1 as protocol_v1, PeerId, ReputationChange as Rep, View, OurView,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{
Expand All @@ -42,7 +42,7 @@ use polkadot_primitives::v1::{
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent
};
use polkadot_subsystem::{
jaeger, errors::{ChainApiError, RuntimeApiError}, PerLeafSpan,
Expand Down Expand Up @@ -843,6 +843,15 @@ impl AvailabilityDistributionSubsystem {
);
}
}
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::AvailabilityFetchingRequest(_),
} => {
// TODO: Implement issue 2306:
tracing::warn!(
target: LOG_TARGET,
"To be implemented, see: https://github.com/paritytech/polkadot/issues/2306 !",
);
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: _,
deactivated: _,
Expand Down
3 changes: 2 additions & 1 deletion node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ use polkadot_subsystem::{
errors::RecoveryError,
messages::{
AvailabilityStoreMessage, AvailabilityRecoveryMessage, AllMessages, NetworkBridgeMessage,
NetworkBridgeEvent,
},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, RequestId,
v1 as protocol_v1, PeerId, ReputationChange as Rep, RequestId,
};
use polkadot_node_subsystem_util::{
Timeout, TimeoutExt,
Expand Down
2 changes: 1 addition & 1 deletion node/network/availability-recovery/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use polkadot_primitives::v1::{
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, JaegerSpan};
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent}, JaegerSpan};

type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>;

Expand Down
6 changes: 3 additions & 3 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ use futures::{channel::oneshot, FutureExt};

use polkadot_subsystem::messages::*;
use polkadot_subsystem::{
PerLeafSpan, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext,
SubsystemResult,
PerLeafSpan, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemResult,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange, OurView};
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, View, ReputationChange, OurView};
use std::collections::{HashMap, HashSet};

const COST_SIGNATURE_INVALID: ReputationChange =
Expand Down
1 change: 1 addition & 0 deletions node/network/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sc-authority-discovery = { git = "https://github.com/paritytech/substrate", bran
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-network-protocol = { path = "../protocol" }
strum = "0.20.0"

[dev-dependencies]
assert_matches = "1.4.0"
Expand Down
72 changes: 51 additions & 21 deletions node/network/bridge/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use polkadot_node_network_protocol::{
peer_set::PeerSet, v1 as protocol_v1, PeerId, ReputationChange,
};
use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockNumber};
use polkadot_subsystem::messages::NetworkBridgeMessage;
use polkadot_subsystem::messages::{AllMessages, NetworkBridgeMessage};
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
use sc_network::Event as NetworkEvent;

use polkadot_node_network_protocol::ObservedRole;
use polkadot_node_network_protocol::{request_response::Requests, ObservedRole};

use super::{WireMessage, LOG_TARGET, MALFORMED_MESSAGE_COST};
use super::multiplexer::RequestMultiplexError;
use super::{WireMessage, MALFORMED_MESSAGE_COST};

/// Internal type combining all actions a `NetworkBridge` might perform.
///
Expand All @@ -43,6 +44,9 @@ pub(crate) enum Action {
/// Ask network to send a collation message.
SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>),

/// Ask network to send requests.
SendRequests(Vec<Requests>),

/// Ask network to connect to validators.
ConnectToValidators {
validator_ids: Vec<AuthorityDiscoveryId>,
Expand Down Expand Up @@ -76,21 +80,42 @@ pub(crate) enum Action {
Vec<WireMessage<protocol_v1::CollationProtocol>>,
),

Abort,
/// Send a message to another subsystem or the overseer.
///
/// Used for handling incoming requests.
SendMessage(AllMessages),

/// Abort with reason.
Abort(AbortReason),
Nop,
}

#[derive(Debug)]
pub(crate) enum AbortReason {
/// Received error from overseer:
SubsystemError(polkadot_subsystem::SubsystemError),
/// The stream of incoming events concluded.
EventStreamConcluded,
/// The stream of incoming requests concluded.
RequestStreamConcluded,
/// We received OverseerSignal::Conclude
OverseerConcluded,
}

impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>> for Action {
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn from(res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>) -> Self {
fn from(
res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>,
) -> Self {
match res {
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => {
Action::ActiveLeaves(active_leaves)
}
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) => {
Action::BlockFinalized(number)
}
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => {
Action::Abort(AbortReason::OverseerConcluded)
}
Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
Expand All @@ -99,6 +124,7 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
NetworkBridgeMessage::SendCollationMessage(peers, msg) => {
Action::SendCollationMessages(vec![(peers, msg)])
}
NetworkBridgeMessage::SendRequests(reqs) => Action::SendRequests(reqs),
NetworkBridgeMessage::SendValidationMessages(msgs) => {
Action::SendValidationMessages(msgs)
}
Expand All @@ -113,25 +139,15 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
connected,
},
},
Err(e) => {
tracing::warn!(target: LOG_TARGET, err = ?e, "Shutting down Network Bridge due to error");
Action::Abort
}
Err(e) => Action::Abort(AbortReason::SubsystemError(e)),
}
}
}

impl From<Option<NetworkEvent>> for Action {
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn from(event: Option<NetworkEvent>) -> Action {
match event {
None => {
tracing::info!(
target: LOG_TARGET,
"Shutting down Network Bridge: underlying event stream concluded"
);
Action::Abort
}
None => Action::Abort(AbortReason::EventStreamConcluded),
Some(NetworkEvent::Dht(_))
| Some(NetworkEvent::SyncConnected { .. })
| Some(NetworkEvent::SyncDisconnected { .. }) => Action::Nop,
Expand All @@ -153,7 +169,9 @@ impl From<Option<NetworkEvent>> for Action {
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
let v_messages: Result<Vec<_>, _> = messages
.iter()
.filter(|(protocol, _)| protocol == &PeerSet::Validation.into_protocol_name())
.filter(|(protocol, _)| {
protocol == &PeerSet::Validation.into_protocol_name()
})
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();

Expand All @@ -164,7 +182,9 @@ impl From<Option<NetworkEvent>> for Action {

let c_messages: Result<Vec<_>, _> = messages
.iter()
.filter(|(protocol, _)| protocol == &PeerSet::Collation.into_protocol_name())
.filter(|(protocol, _)| {
protocol == &PeerSet::Collation.into_protocol_name()
})
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();

Expand All @@ -182,3 +202,13 @@ impl From<Option<NetworkEvent>> for Action {
}
}
}

impl From<Option<Result<AllMessages, RequestMultiplexError>>> for Action {
fn from(event: Option<Result<AllMessages, RequestMultiplexError>>) -> Self {
match event {
None => Action::Abort(AbortReason::RequestStreamConcluded),
Some(Err(err)) => Action::ReportPeer(err.peer, MALFORMED_MESSAGE_COST),
Some(Ok(msg)) => Action::SendMessage(msg),
}
}
}
Loading