Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ members = [

"node/messages",
"node/overseer",
"node/primitives",
"node/service",

"parachain/test-parachains",
Expand Down
4 changes: 4 additions & 0 deletions node/messages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ description = "Message types used by Subsystems"

[dependencies]
polkadot-primitives = { path = "../../primitives" }
polkadot-statement-table = { path = "../../statement-table" }
polkadot-node-primitives = { path = "../primitives" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = "0.3.5"
180 changes: 170 additions & 10 deletions node/messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,17 @@
//!
//! Subsystems' APIs are defined separately from their implementation, leading to easier mocking.

use polkadot_primitives::Hash;
use futures::channel::{mpsc, oneshot};

use sc_network::{ObservedRole, ReputationChange, PeerId, config::ProtocolId};
use polkadot_primitives::{BlockNumber, Hash, Signature};
use polkadot_primitives::parachain::{
AbridgedCandidateReceipt, PoVBlock, ErasureChunk, BackedCandidate, Id as ParaId,
SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex,
};
use polkadot_node_primitives::{
MisbehaviorReport, SignedStatement,
};

/// Signals sent by an overseer to a subsystem.
#[derive(PartialEq, Clone, Debug)]
Expand All @@ -35,24 +45,174 @@ pub enum OverseerSignal {
Conclude,
}

/// A message type used by the Validation Subsystem.
/// A notification of a new backed candidate.
#[derive(Debug)]
pub struct NewBackedCandidate(pub BackedCandidate);

/// Messages received by the Candidate Selection subsystem.
#[derive(Debug)]
pub enum CandidateSelectionMessage {
/// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator.
/// The hash is the relay parent.
Invalid(Hash, AbridgedCandidateReceipt),
}

/// Messages received by the Candidate Backing subsystem.
#[derive(Debug)]
pub enum ValidationSubsystemMessage {
ValidityAttestation,
pub enum CandidateBackingMessage {
/// Registers a stream listener for updates to the set of backable candidates that could be backed
/// in a child of the given relay-parent, referenced by its hash.
RegisterBackingWatcher(Hash, mpsc::Sender<NewBackedCandidate>),
/// Note that the Candidate Backing subsystem should second the given candidate in the context of the
/// given relay-parent (ref. by hash). This candidate must be validated.
Second(Hash, AbridgedCandidateReceipt),
/// Note a validator's statement about a particular candidate. Disagreements about validity must be escalated
/// to a broader check by Misbehavior Arbitration. Agreements are simply tallied until a quorum is reached.
Statement(Hash, SignedStatement),
}

/// A message type used by the CandidateBacking Subsystem.
/// Blanket error for validation failing.
#[derive(Debug)]
pub enum CandidateBackingSubsystemMessage {
RegisterBackingWatcher,
Second,
pub struct ValidationFailed;

/// Messages received by the Validation subsystem
#[derive(Debug)]
pub enum CandidateValidationMessage {
/// Validate a candidate, sending a side-channel response of valid or invalid.
///
/// Provide the relay-parent in whose context this should be validated, the full candidate receipt,
/// and the PoV.
Validate(
Hash,
AbridgedCandidateReceipt,
PoVBlock,
oneshot::Sender<Result<(), ValidationFailed>>,
),
}

/// Chain heads.
///
/// Up to `N` (5?) chain heads.
pub struct View(pub Vec<Hash>);

/// Events from network.
pub enum NetworkBridgeEvent {
/// A peer has connected.
PeerConnected(PeerId, ObservedRole),

/// A peer has disconnected.
PeerDisconnected(PeerId),

/// Peer has sent a message.
PeerMessage(PeerId, Vec<u8>),

/// Peer's `View` has changed.
PeerViewChange(PeerId, View),

/// Our `View` has changed.
OurViewChange(View),
}

/// Messages received by the network bridge subsystem.
pub enum NetworkBridgeSubsystemMessage {
/// Register an event producer on startup.
RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages),

/// Report a peer for their actions.
ReportPeer(PeerId, ReputationChange),

/// Send a message to multiple peers.
SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>),
}

/// Availability Distribution Message.
pub enum AvailabilityDistributionMessage {
/// Distribute an availability chunk to other validators.
DistributeChunk(Hash, ErasureChunk),

/// Fetch an erasure chunk from networking by candidate hash and chunk index.
FetchChunk(Hash, u32),

/// Event from the network bridge.
NetworkBridgeUpdate(NetworkBridgeEvent),
}

/// Bitfield distribution message.
pub enum BitfieldDistributionMessage {
/// Distribute a bitfield via gossip to other validators.
DistributeBitfield(Hash, SignedAvailabilityBitfield),

/// Event from the network bridge.
NetworkBridgeUpdate(NetworkBridgeEvent),
}

/// Availability store subsystem message.
pub enum AvailabilityStoreMessage {
/// Query a `PoVBlock` from the AV store.
QueryPoV(Hash, oneshot::Sender<Option<PoVBlock>>),

/// Query an `ErasureChunk` from the AV store.
QueryChunk(Hash, ValidatorIndex, oneshot::Sender<ErasureChunk>),

/// Store an `ErasureChunk` in the AV store.
StoreChunk(Hash, ValidatorIndex, ErasureChunk),
}

/// A request to the Runtime API subsystem.
pub enum RuntimeApiRequest {
/// Get the current validator set.
Validators(oneshot::Sender<Vec<ValidatorId>>),
/// Get a signing context for bitfields and statements.
SigningContext(oneshot::Sender<SigningContext>),
/// Get the validation code for a specific para, assuming execution under given block number, and
/// an optional block number representing an intermediate parablock executed in the context of
/// that block.
ValidationCode(ParaId, BlockNumber, Option<BlockNumber>, oneshot::Sender<ValidationCode>),
}

/// A message to the Runtime API subsystem.
pub enum RuntimeApiMessage {
/// Make a request of the runtime API against the post-state of the given relay-parent.
Request(Hash, RuntimeApiRequest),
}

/// Statement distribution message.
pub enum StatementDistributionMessage {
/// We have originated a signed statement in the context of
/// given relay-parent hash and it should be distributed to other validators.
Share(Hash, SignedStatement),
}

/// This data becomes intrinsics or extrinsics which should be included in a future relay chain block.
pub enum ProvisionableData {
/// This bitfield indicates the availability of various candidate blocks.
Bitfield(Hash, SignedAvailabilityBitfield),
/// The Candidate Backing subsystem believes that this candidate is valid, pending availability.
BackedCandidate(BackedCandidate),
/// Misbehavior reports are self-contained proofs of validator misbehavior.
MisbehaviorReport(Hash, MisbehaviorReport),
/// Disputes trigger a broad dispute resolution process.
Dispute(Hash, Signature),
}

/// Message to the Provisioner.
///
/// In all cases, the Hash is that of the relay parent.
pub enum ProvisionerMessage {
/// This message allows potential block authors to be kept updated with all new authorship data
/// as it becomes available.
RequestBlockAuthorshipData(Hash, mpsc::Sender<ProvisionableData>),
/// This data should become part of a relay chain block
ProvisionableData(ProvisionableData),
}

/// A message type tying together all message types that are used across Subsystems.
#[derive(Debug)]
pub enum AllMessages {
Validation(ValidationSubsystemMessage),
CandidateBacking(CandidateBackingSubsystemMessage),
/// Message for the validation subsystem.
CandidateValidation(CandidateValidationMessage),
/// Message for the candidate backing subsystem.
CandidateBacking(CandidateBackingMessage),
}

/// A message type that a subsystem receives from an overseer.
Expand Down
29 changes: 20 additions & 9 deletions node/overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@

use std::time::Duration;
use futures::{
channel::oneshot,
pending, pin_mut, executor, select, stream,
FutureExt, StreamExt,
};
use futures_timer::Delay;
use kv_log_macro as log;

use polkadot_primitives::parachain::{BlockData, PoVBlock};
use polkadot_overseer::{Overseer, Subsystem, SubsystemContext, SpawnedSubsystem};

use messages::{
AllMessages, CandidateBackingSubsystemMessage, FromOverseer, ValidationSubsystemMessage
AllMessages, CandidateBackingMessage, FromOverseer, CandidateValidationMessage
};

struct Subsystem1;

impl Subsystem1 {
async fn run(mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) {
async fn run(mut ctx: SubsystemContext<CandidateBackingMessage>) {
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
Expand All @@ -52,15 +54,24 @@ impl Subsystem1 {
}

Delay::new(Duration::from_secs(1)).await;
ctx.send_msg(AllMessages::Validation(
ValidationSubsystemMessage::ValidityAttestation
let (tx, _) = oneshot::channel();

ctx.send_msg(AllMessages::CandidateValidation(
CandidateValidationMessage::Validate(
Default::default(),
Default::default(),
PoVBlock {
block_data: BlockData(Vec::new()),
},
tx,
)
)).await.unwrap();
}
}
}

impl Subsystem<CandidateBackingSubsystemMessage> for Subsystem1 {
fn start(&mut self, ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateBackingMessage> for Subsystem1 {
fn start(&mut self, ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
Expand All @@ -70,7 +81,7 @@ impl Subsystem<CandidateBackingSubsystemMessage> for Subsystem1 {
struct Subsystem2;

impl Subsystem2 {
async fn run(mut ctx: SubsystemContext<ValidationSubsystemMessage>) {
async fn run(mut ctx: SubsystemContext<CandidateValidationMessage>) {
ctx.spawn(Box::pin(async {
loop {
log::info!("Job tick");
Expand All @@ -94,8 +105,8 @@ impl Subsystem2 {
}
}

impl Subsystem<ValidationSubsystemMessage> for Subsystem2 {
fn start(&mut self, ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateValidationMessage> for Subsystem2 {
fn start(&mut self, ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
Expand Down
Loading