diff --git a/Cargo.lock b/Cargo.lock index 1aa1be64918b..9c36c903341e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4496,6 +4496,20 @@ dependencies = [ "sp-runtime", ] +[[package]] +name = "polkadot-node-bitfield-signing" +version = "0.1.0" +dependencies = [ + "bitvec", + "derive_more 0.99.9", + "futures 0.3.5", + "log 0.4.8", + "polkadot-node-subsystem", + "polkadot-primitives", + "sc-keystore", + "wasm-timer", +] + [[package]] name = "polkadot-node-core-backing" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 4d6b40feb704..ae80771ada25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ members = [ "service", "validation", + "node/bitfield-signing", "node/core/proposer", "node/network/bridge", "node/network/pov-distribution", diff --git a/node/bitfield-signing/Cargo.toml b/node/bitfield-signing/Cargo.toml new file mode 100644 index 000000000000..c3a2934aaddd --- /dev/null +++ b/node/bitfield-signing/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "polkadot-node-bitfield-signing" +version = "0.1.0" +authors = ["Peter Goodspeed-Niklaus "] +edition = "2018" + +[dependencies] +bitvec = "0.17.4" +derive_more = "0.99.9" +futures = "0.3.5" +log = "0.4.8" +polkadot-primitives = { path = "../../primitives" } +polkadot-node-subsystem = { path = "../subsystem" } +keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" } +wasm-timer = "0.2.4" diff --git a/node/bitfield-signing/src/lib.rs b/node/bitfield-signing/src/lib.rs new file mode 100644 index 000000000000..b973f4b3ae09 --- /dev/null +++ b/node/bitfield-signing/src/lib.rs @@ -0,0 +1,290 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The bitfield signing subsystem produces `SignedAvailabilityBitfield`s once per block. + +use bitvec::bitvec; +use futures::{ + channel::{mpsc, oneshot}, + prelude::*, + stream, Future, +}; +use keystore::KeyStorePtr; +use polkadot_node_subsystem::{ + messages::{ + self, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, + BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, + }, + util::{self, JobManager, JobTrait, ToJobTrait, Validator}, +}; +use polkadot_primitives::v1::{AvailabilityBitfield, CoreOccupied, Hash}; +use std::{convert::TryFrom, pin::Pin, time::Duration}; +use wasm_timer::{Delay, Instant}; + +/// Delay between starting a bitfield signing job and its attempting to create a bitfield. +const JOB_DELAY: Duration = Duration::from_millis(1500); + +/// Each `BitfieldSigningJob` prepares a signed bitfield for a single relay parent. +pub struct BitfieldSigningJob; + +/// Messages which a `BitfieldSigningJob` is prepared to receive. +pub enum ToJob { + BitfieldSigning(BitfieldSigningMessage), + Stop, +} + +impl ToJobTrait for ToJob { + const STOP: Self = ToJob::Stop; + + fn relay_parent(&self) -> Option { + match self { + Self::BitfieldSigning(bsm) => bsm.relay_parent(), + Self::Stop => None, + } + } +} + +impl TryFrom for ToJob { + type Error = (); + + fn try_from(msg: AllMessages) -> Result { + match msg { + AllMessages::BitfieldSigning(bsm) => Ok(ToJob::BitfieldSigning(bsm)), + _ => Err(()), + } + } +} + +impl From for ToJob { + fn from(bsm: BitfieldSigningMessage) -> ToJob { + ToJob::BitfieldSigning(bsm) + } +} + +/// Messages which may be sent from a `BitfieldSigningJob`. +pub enum FromJob { + AvailabilityStore(AvailabilityStoreMessage), + BitfieldDistribution(BitfieldDistributionMessage), + CandidateBacking(CandidateBackingMessage), + RuntimeApi(RuntimeApiMessage), +} + +impl From for AllMessages { + fn from(from_job: FromJob) -> AllMessages { + match from_job { + FromJob::AvailabilityStore(asm) => AllMessages::AvailabilityStore(asm), + FromJob::BitfieldDistribution(bdm) => AllMessages::BitfieldDistribution(bdm), + FromJob::CandidateBacking(cbm) => AllMessages::CandidateBacking(cbm), + FromJob::RuntimeApi(ram) => AllMessages::RuntimeApi(ram), + } + } +} + +impl TryFrom for FromJob { + type Error = (); + + fn try_from(msg: AllMessages) -> Result { + match msg { + AllMessages::AvailabilityStore(asm) => Ok(Self::AvailabilityStore(asm)), + AllMessages::BitfieldDistribution(bdm) => Ok(Self::BitfieldDistribution(bdm)), + AllMessages::CandidateBacking(cbm) => Ok(Self::CandidateBacking(cbm)), + AllMessages::RuntimeApi(ram) => Ok(Self::RuntimeApi(ram)), + _ => Err(()), + } + } +} + +/// Errors we may encounter in the course of executing the `BitfieldSigningSubsystem`. +#[derive(Debug, derive_more::From)] +pub enum Error { + /// error propagated from the utility subsystem + #[from] + Util(util::Error), + /// io error + #[from] + Io(std::io::Error), + /// a one shot channel was canceled + #[from] + Oneshot(oneshot::Canceled), + /// a mspc channel failed to send + #[from] + MpscSend(mpsc::SendError), + /// several errors collected into one + #[from] + Multiple(Vec), +} + +// this function exists mainly to collect a bunch of potential error points into one. +async fn get_core_availability( + relay_parent: Hash, + idx: usize, + core: Option, + sender: &mpsc::Sender, +) -> Result { + use messages::{ + AvailabilityStoreMessage::QueryPoVAvailable, + RuntimeApiRequest::CandidatePendingAvailability, + }; + use FromJob::{AvailabilityStore, RuntimeApi}; + use RuntimeApiMessage::Request; + + // we have to (cheaply) clone this sender so we can mutate it to actually send anything + let mut sender = sender.clone(); + + // REVIEW: is it safe to ignore parathreads here, or do they also figure in the availability mapping? + if let Some(CoreOccupied::Parachain) = core { + let (tx, rx) = oneshot::channel(); + sender + .send(RuntimeApi(Request( + relay_parent, + CandidatePendingAvailability(idx.into(), tx), + ))) + .await?; + let committed_candidate_receipt = match rx.await? { + Some(ccr) => ccr, + None => return Ok(false), + }; + let (tx, rx) = oneshot::channel(); + sender + .send(AvailabilityStore(QueryPoVAvailable( + committed_candidate_receipt.descriptor.pov_hash, + tx, + ))) + .await?; + return rx.await.map_err(Into::into); + } + Ok(false) +} + +// the way this function works is not intuitive: +// +// - get the scheduler roster so we have a list of cores, in order. +// - for each occupied core, fetch `candidate_pending_availability` from runtime +// - from there, we can get the `CandidateDescriptor` +// - from there, we can send a `AvailabilityStore::QueryPoV` and set the indexed bit to 1 if it returns Some(_) +async fn construct_availability_bitfield( + relay_parent: Hash, + sender: &mut mpsc::Sender, +) -> Result { + use futures::lock::Mutex; + + use messages::RuntimeApiRequest::ValidatorGroups; + use FromJob::RuntimeApi; + use RuntimeApiMessage::Request; + + // request the validator groups so we can get the scheduler roster + let (tx, rx) = oneshot::channel(); + sender + .send(RuntimeApi(Request(relay_parent, ValidatorGroups(tx)))) + .await?; + + // we now need sender to be immutable so we can copy the reference to multiple concurrent closures + let sender = &*sender; + + // wait for the scheduler roster + let scheduler_roster = rx.await?; + + // prepare outputs + let out = + Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; scheduler_roster.availability_cores.len())); + // in principle, we know that we never want concurrent access to the _same_ bit within the vec; + // we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding + // any need to ever wait to lock this mutex. + // in practice, it's safer to just use the mutex, and speed optimizations should wait until + // benchmarking proves that they are necessary. + let out_ref = &out; + let errs = Mutex::new(Vec::new()); + let errs_ref = &errs; + + // Handle each (idx, core) pair concurrently + // + // In principle, this work is all concurrent, not parallel. In practice, we can't guarantee it, which is why + // we need the mutexes and explicit references above. + stream::iter(scheduler_roster.availability_cores.into_iter().enumerate()) + .for_each_concurrent(None, |(idx, core)| async move { + let availability = match get_core_availability(relay_parent, idx, core, sender).await { + Ok(availability) => availability, + Err(err) => { + errs_ref.lock().await.push(err); + return; + } + }; + out_ref.lock().await.set(idx, availability); + }) + .await; + + let errs = errs.into_inner(); + if errs.is_empty() { + Ok(out.into_inner().into()) + } else { + Err(errs.into()) + } +} + +impl JobTrait for BitfieldSigningJob { + type ToJob = ToJob; + type FromJob = FromJob; + type Error = Error; + type RunArgs = KeyStorePtr; + + const NAME: &'static str = "BitfieldSigningJob"; + + /// Run a job for the parent block indicated + fn run( + relay_parent: Hash, + keystore: Self::RunArgs, + _receiver: mpsc::Receiver, + mut sender: mpsc::Sender, + ) -> Pin> + Send>> { + async move { + // figure out when to wait to + let wait_until = Instant::now() + JOB_DELAY; + + // now do all the work we can before we need to wait for the availability store + // if we're not a validator, we can just succeed effortlessly + let validator = match Validator::new(relay_parent, keystore, sender.clone()).await { + Ok(validator) => validator, + Err(util::Error::NotAValidator) => return Ok(()), + Err(err) => return Err(Error::Util(err)), + }; + + // wait a bit before doing anything else + Delay::new_at(wait_until).await?; + + let bitfield = construct_availability_bitfield(relay_parent, &mut sender).await?; + let signed_bitfield = validator.sign(bitfield); + + // make an anonymous scope to contain some use statements to simplify creating the outbound message + { + use BitfieldDistributionMessage::DistributeBitfield; + use FromJob::BitfieldDistribution; + + sender + .send(BitfieldDistribution(DistributeBitfield( + relay_parent, + signed_bitfield, + ))) + .await + .map_err(Into::into) + } + } + .boxed() + } +} + +/// BitfieldSigningSubsystem manages a number of bitfield signing jobs. +pub type BitfieldSigningSubsystem = + JobManager; diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index 35ebf8f3367d..36440275cb5b 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -147,6 +147,7 @@ fn main() { candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index c6ac6444c29e..cb158d09c129 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -77,7 +77,7 @@ use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; use polkadot_subsystem::messages::{ CandidateValidationMessage, CandidateBackingMessage, CandidateSelectionMessage, StatementDistributionMessage, - AvailabilityDistributionMessage, BitfieldDistributionMessage, + AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage, ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, }; @@ -339,6 +339,9 @@ pub struct Overseer { /// An availability distribution subsystem. availability_distribution_subsystem: OverseenSubsystem, + /// A bitfield signing subsystem. + bitfield_signing_subsystem: OverseenSubsystem, + /// A bitfield distribution subsystem. bitfield_distribution_subsystem: OverseenSubsystem, @@ -390,7 +393,7 @@ pub struct Overseer { /// /// [`Subsystem`]: trait.Subsystem.html /// [`DummySubsystem`]: struct.DummySubsystem.html -pub struct AllSubsystems { +pub struct AllSubsystems { /// A candidate validation subsystem. pub candidate_validation: CV, /// A candidate backing subsystem. @@ -401,6 +404,8 @@ pub struct AllSubsystems { pub statement_distribution: SD, /// An availability distribution subsystem. pub availability_distribution: AD, + /// A bitfield signing subsystem. + pub bitfield_signing: BS, /// A bitfield distribution subsystem. pub bitfield_distribution: BD, /// A provisioner subsystem. @@ -487,6 +492,7 @@ where /// candidate_selection: DummySubsystem, /// statement_distribution: DummySubsystem, /// availability_distribution: DummySubsystem, + /// bitfield_signing: DummySubsystem, /// bitfield_distribution: DummySubsystem, /// provisioner: DummySubsystem, /// pov_distribution: DummySubsystem, @@ -513,9 +519,9 @@ where /// # /// # }); } /// ``` - pub fn new( + pub fn new( leaves: impl IntoIterator, - all_subsystems: AllSubsystems, + all_subsystems: AllSubsystems, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> where @@ -524,6 +530,7 @@ where CS: Subsystem> + Send, SD: Subsystem> + Send, AD: Subsystem> + Send, + BS: Subsystem> + Send, BD: Subsystem> + Send, P: Subsystem> + Send, PoVD: Subsystem> + Send, @@ -575,6 +582,13 @@ where all_subsystems.availability_distribution, )?; + let bitfield_signing_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.bitfield_signing, + )?; + let bitfield_distribution_subsystem = spawn( &mut s, &mut running_subsystems, @@ -630,6 +644,7 @@ where candidate_selection_subsystem, statement_distribution_subsystem, availability_distribution_subsystem, + bitfield_signing_subsystem, bitfield_distribution_subsystem, provisioner_subsystem, pov_distribution_subsystem, @@ -871,6 +886,11 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } + AllMessages::BitfieldSigning(msg) => { + if let Some(ref mut s) = self.bitfield_signing_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication{ msg }).await; + } + } AllMessages::Provisioner(msg) => { if let Some(ref mut s) = self.provisioner_subsystem.instance { let _ = s.tx.send(FromOverseer::Communication { msg }).await; @@ -1050,6 +1070,7 @@ mod tests { candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, @@ -1112,6 +1133,7 @@ mod tests { candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, @@ -1227,6 +1249,7 @@ mod tests { candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, @@ -1323,6 +1346,7 @@ mod tests { candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 6d5cdbda427e..46ba4d0e03fa 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -280,6 +280,7 @@ fn real_overseer( candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, bitfield_distribution: DummySubsystem, provisioner: DummySubsystem, pov_distribution: DummySubsystem, diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index 2040b413488d..7f1c7f48fffc 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -26,7 +26,7 @@ use futures::channel::{mpsc, oneshot}; use polkadot_primitives::v1::{ BlockNumber, Hash, - CandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId, + CandidateReceipt, CommittedCandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId, SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex, CoreAssignment, CoreOccupied, HeadData, CandidateDescriptor, ValidatorSignature, OmittedValidationData, @@ -220,12 +220,32 @@ impl BitfieldDistributionMessage { } } +/// Bitfield signing message. +/// +/// Currently non-instantiable. +#[derive(Debug)] +pub enum BitfieldSigningMessage {} + +impl BitfieldSigningMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option { + None + } +} + /// Availability store subsystem message. #[derive(Debug)] pub enum AvailabilityStoreMessage { /// Query a `PoV` from the AV store. QueryPoV(Hash, oneshot::Sender>), + /// Query whether a `PoV` exists within the AV Store. + /// + /// This is useful in cases like bitfield signing, when existence + /// matters, but we don't want to necessarily pass around multiple + /// megabytes of data to get a single bit of information. + QueryPoVAvailable(Hash, oneshot::Sender), + /// Query an `ErasureChunk` from the AV store. QueryChunk(Hash, ValidatorIndex, oneshot::Sender), @@ -238,6 +258,7 @@ impl AvailabilityStoreMessage { pub fn relay_parent(&self) -> Option { match self { Self::QueryPoV(hash, _) => Some(*hash), + Self::QueryPoVAvailable(hash, _) => Some(*hash), Self::QueryChunk(hash, _, _) => Some(*hash), Self::StoreChunk(hash, _, _) => Some(*hash), } @@ -272,6 +293,8 @@ pub enum RuntimeApiRequest { ValidationCode(ParaId, BlockNumber, Option, oneshot::Sender), /// Get head data for a specific para. HeadData(ParaId, oneshot::Sender), + /// Get a the candidate pending availability for a particular parachain by parachain / core index + CandidatePendingAvailability(ParaId, oneshot::Sender>), } /// A message to the Runtime API subsystem. @@ -398,6 +421,8 @@ pub enum AllMessages { AvailabilityDistribution(AvailabilityDistributionMessage), /// Message for the bitfield distribution subsystem. BitfieldDistribution(BitfieldDistributionMessage), + /// Message for the bitfield signing subsystem. + BitfieldSigning(BitfieldSigningMessage), /// Message for the Provisioner subsystem. Provisioner(ProvisionerMessage), /// Message for the PoV Distribution subsystem. diff --git a/parachain/src/primitives.rs b/parachain/src/primitives.rs index f6b69cd89441..65d890e26dcf 100644 --- a/parachain/src/primitives.rs +++ b/parachain/src/primitives.rs @@ -80,6 +80,36 @@ impl From for Id { fn from(x: u32) -> Self { Id(x) } } +impl From for Id { + fn from(x: usize) -> Self { + use sp_std::convert::TryInto; + // can't panic, so need to truncate + let x = x.try_into().unwrap_or(u32::MAX); + Id(x) + } +} + +// When we added a second From impl for Id, type inference could no longer +// determine which impl should apply for things like `5.into()`. It therefore +// raised a bunch of errors in our test code, scattered throughout the +// various modules' tests, that there is no impl of `From` (`i32` being +// the default numeric type). +// +// We can't use `cfg(test)` here, because that configuration directive does not +// propagate between crates, which would fail to fix tests in crates other than +// this one. +// +// Instead, let's take advantage of the observation that what really matters for a +// ParaId within a test context is that it is unique and constant. I believe that +// there is no case where someone does `(-1).into()` anyway, but if they do, it +// never matters whether the actual contained ID is `-1` or `4294967295`. Nobody +// does arithmetic on a `ParaId`; doing so would be a bug. +impl From for Id { + fn from(x: i32) -> Self { + Id(x as u32) + } +} + const USER_INDEX_START: u32 = 1000; /// The ID of the first user (non-system) parachain. diff --git a/roadmap/implementers-guide/src/node/availability/bitfield-signing.md b/roadmap/implementers-guide/src/node/availability/bitfield-signing.md index 736fac4ff1b4..0ca9badd32e2 100644 --- a/roadmap/implementers-guide/src/node/availability/bitfield-signing.md +++ b/roadmap/implementers-guide/src/node/availability/bitfield-signing.md @@ -4,6 +4,10 @@ Validators vote on the availability of a backed candidate by issuing signed bitf ## Protocol +Input: + +There is no dedicated input mechanism for bitfield signing. Instead, Bitfield Signing produces a bitfield representing the current state of availability on `StartWork`. + Output: - BitfieldDistribution::DistributeBitfield: distribute a locally signed bitfield @@ -18,8 +22,8 @@ Upon receipt of an `ActiveLeavesUpdate`, launch bitfield signing job for each `a Localized to a specific relay-parent `r` If not running as a validator, do nothing. +- Begin by waiting a fixed period of time so availability distribution has the chance to make candidates available. - Determine our validator index `i`, the set of backed candidates pending availability in `r`, and which bit of the bitfield each corresponds to. -- > TODO: wait T time for availability distribution? - Start with an empty bitfield. For each bit in the bitfield, if there is a candidate pending availability, query the [Availability Store](../utility/availability-store.md) for whether we have the availability chunk for our validator index. - For all chunks we have, set the corresponding bit in the bitfield. - Sign the bitfield and dispatch a `BitfieldDistribution::DistributeBitfield` message.