Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d0aac4c
create a v1 primitives module
rphmeier Jul 9, 2020
db8a5de
Improve guide on availability types
rphmeier Jul 9, 2020
1312093
punctuate
rphmeier Jul 9, 2020
cbd848a
new parachains runtime uses new primitives
rphmeier Jul 9, 2020
14425bd
tests of new runtime now use new primitives
rphmeier Jul 9, 2020
8313f46
add ErasureChunk to guide
rphmeier Jul 9, 2020
7794af4
export erasure chunk from v1 primitives
rphmeier Jul 9, 2020
888fee0
subsystem crate uses v1 primitives
rphmeier Jul 9, 2020
25e7acf
node-primitives uses new v1 primitives
rphmeier Jul 9, 2020
3872d3e
port overseer to new primitives
rphmeier Jul 9, 2020
53cd37f
new-proposer uses v1 primitives (no ParachainHost anymore)
rphmeier Jul 9, 2020
072c5fe
fix no-std compilation for primitives
rphmeier Jul 9, 2020
5dc4aa7
service-new uses v1 primitives
rphmeier Jul 9, 2020
e0b0a31
network-bridge uses new primitives
rphmeier Jul 9, 2020
d066d3e
statement distribution uses v1 primitives
rphmeier Jul 9, 2020
09c81f8
PoV distribution uses v1 primitives; add PoV::hash fn
rphmeier Jul 9, 2020
438474f
move parachain to v0
rphmeier Jul 9, 2020
6189c4c
remove inclusion_inherent module and place into v1
rphmeier Jul 9, 2020
0aeee53
remove everything from primitives crate root
rphmeier Jul 9, 2020
9cb6238
remove some unused old types from v0 primitives
rphmeier Jul 9, 2020
55956a8
point everything else at primitives::v0
rphmeier Jul 9, 2020
25719ce
squanch some warns up
rphmeier Jul 9, 2020
e5cf500
add RuntimeDebug import to no-std as well
rphmeier Jul 9, 2020
023a1d4
Merge branch 'master' into rh-primitives-refactor
rphmeier Jul 9, 2020
1af103e
Merge branch 'master' into rh-primitives-refactor
rphmeier Jul 9, 2020
0d3013f
port over statement-table and validation
rphmeier Jul 9, 2020
8c2403f
fix final errors in validation and node-primitives
rphmeier Jul 9, 2020
3347e7f
add dummy Ord impl to committed candidate receipt
rphmeier Jul 9, 2020
b241140
guide: update CandidateValidationMessage
rphmeier Jul 9, 2020
c70badd
add primitive for validationoutputs
rphmeier Jul 9, 2020
43e8e30
expand CandidateValidationMessage further
rphmeier Jul 9, 2020
935d536
bikeshed
rphmeier Jul 9, 2020
3ec9437
add some impls to omitted-validation-data and available-data
rphmeier Jul 9, 2020
821c556
expand CandidateValidationMessage
rphmeier Jul 9, 2020
dc7cd08
make erasure-coding generic over v1/v0
rphmeier Jul 9, 2020
6306116
update usages of erasure-coding
rphmeier Jul 9, 2020
1bd4809
implement commitments.hash()
rphmeier Jul 9, 2020
afd20dd
use Arc<Pov> for CandidateValidation
rphmeier Jul 9, 2020
0fbbc42
improve new erasure-coding method names
rphmeier Jul 9, 2020
beb0dc4
fix up candidate backing
rphmeier Jul 9, 2020
d69770c
update docs a bit
rphmeier Jul 10, 2020
bbc2ea1
fix most tests and add short-circuiting to make_pov_available
rphmeier Jul 10, 2020
4015271
fix remainder of candidate backing tests
rphmeier Jul 10, 2020
c8b0af4
squanching warns
rphmeier Jul 10, 2020
8e5e35d
squanch it up
rphmeier Jul 10, 2020
5d352cc
some fallout
rphmeier Jul 10, 2020
9370455
overseer fallout
rphmeier Jul 10, 2020
60c3bb0
free from polkadot-test-service hell
rphmeier Jul 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix up candidate backing
  • Loading branch information
rphmeier committed Jul 9, 2020
commit beb0dc4f7d7c32c48e29a794a00b7e7db2469abd
165 changes: 88 additions & 77 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use bitvec::vec::BitVec;
Expand All @@ -38,12 +39,13 @@ use primitives::Pair;
use keystore::KeyStorePtr;
use polkadot_primitives::v1::{
CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorPair, ValidatorId,
ValidatorIndex, HeadData, SigningContext, PoV, OmittedValidationData,
CandidateDescriptor, LocalValidationData, GlobalValidationSchedule, AvailableData,
ErasureChunk, ValidatorSignature, Hash, CandidateReceipt,
ValidatorIndex, SigningContext, PoV, OmittedValidationData,
CandidateDescriptor, AvailableData, ErasureChunk, ValidatorSignature, Hash, CandidateReceipt,
CandidateCommitments,
};
use polkadot_node_primitives::{
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
ValidationOutputs,
};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem,
Expand Down Expand Up @@ -91,8 +93,6 @@ struct CandidateBackingJob {
/// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJob>,

/// `HeadData`s of the parachains that this validator is assigned to.
head_data: HeadData,
/// The `ParaId`s assigned to this validator.
assignment: ParaId,
/// We issued `Valid` or `Invalid` statements on about these candidates.
Expand Down Expand Up @@ -134,7 +134,7 @@ impl TableContextTrait for TableContext {
}

fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.iter().position(|&a| a == authority).is_some())
self.groups.get(group).map_or(false, |g| g.iter().position(|a| a == authority).is_some())
}

fn requisite_votes(&self, group: &ParaId) -> usize {
Expand Down Expand Up @@ -236,44 +236,61 @@ impl CandidateBackingJob {

async fn issue_candidate_invalid_message(
&mut self,
candidate: CommittedCandidateReceipt,
candidate: CandidateReceipt,
) -> Result<(), Error> {
self.tx_from.send(FromJob::CandidateSelection(
CandidateSelectionMessage::Invalid(self.parent, candidate.to_plain())
CandidateSelectionMessage::Invalid(self.parent, candidate)
)).await?;

Ok(())
}

/// Validate the candidate that is requested to be `Second`ed and distribute validation result.
///
/// Returns `Ok(true)` if we issued a `Seconded` statement about this candidate.
async fn validate_and_second(
&mut self,
candidate: CandidateReceipt,
candidate: &CandidateReceipt,
pov: PoV,
) -> Result<ValidationResult, Error> {
let valid = self.request_candidate_validation(candidate.clone(), pov.clone()).await?;
let statement = match valid.0 {
ValidationResult::Valid => {
) -> Result<bool, Error> {
let valid = self.request_candidate_validation(
candidate.descriptor().clone(),
Arc::new(pov.clone()),
).await?;

let candidate_hash = candidate.hash();

let (was_valid, statement) = match valid {
ValidationResult::Valid(outputs) => {
// make PoV available for later distribution. Send data to the availability
// store to keep. Sign and dispatch `valid` statement to network if we
// have not seconded the given candidate.
self.make_pov_available(pov, valid.1, valid.2).await?;
self.issued_statements.insert(candidate.hash());
Statement::Seconded(candidate)
let commitments = self.make_pov_available(pov, outputs).await?;

let candidate = CommittedCandidateReceipt {
descriptor: candidate.descriptor.clone(),
commitments,
};
(true, Some(Statement::Seconded(candidate)))
}
ValidationResult::Invalid => {
let candidate_hash = candidate.hash();
self.issue_candidate_invalid_message(candidate).await?;
Statement::Invalid(candidate_hash)
// no need to issue a statement about this if we aren't seconding it.
//
// there's an infinite amount of garbage out there. no need to acknowledge
// all of it.
self.issue_candidate_invalid_message(candidate.clone()).await?;
(false, None)
}
};

if let Some(signed_statement) = self.sign_statement(statement) {
self.issued_statements.insert(candidate_hash);

if let Some(signed_statement) = statement.and_then(|s| self.sign_statement(s)) {
self.import_statement(&signed_statement).await?;
self.distribute_signed_statement(signed_statement).await?;
}

Ok(valid.0)
Ok(was_valid)
}

fn get_backed(&self) -> Vec<NewBackedCandidate> {
Expand Down Expand Up @@ -382,8 +399,8 @@ impl CandidateBackingJob {
let candidate_hash = candidate.hash();

if !self.issued_statements.contains(&candidate_hash) {
if let Ok(ValidationResult::Valid) = self.validate_and_second(
candidate,
if let Ok(true) = self.validate_and_second(
&candidate,
pov,
).await {
self.seconded = Some(candidate_hash);
Expand Down Expand Up @@ -412,17 +429,32 @@ impl CandidateBackingJob {
async fn kick_off_validation_work(
&mut self,
summary: TableSummary,
) -> Result<ValidationResult, Error> {
let candidate = self.table.get_candidate(&summary.candidate).ok_or(Error::CandidateNotFound)?;
let candidate = candidate.clone();
let descriptor = candidate.to_descriptor();
let candidate_hash = candidate.hash();
let pov = self.request_pov_from_distribution(descriptor).await?;
let v = self.request_candidate_validation(candidate, pov).await?;
) -> Result<(), Error> {
let candidate_hash = summary.candidate.clone();

let statement = match v.0 {
ValidationResult::Valid => {
Statement::Valid(candidate_hash)
if self.issued_statements.contains(&candidate_hash) {
return Ok(())
}

// We clone the commitments here because there are borrowck
// errors relating to this being a struct and methods borrowing the entirety of self
// and not just those things that the function uses.
let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?;
let expected_commitments = candidate.commitments.clone();

let descriptor = candidate.descriptor().clone();
let pov = self.request_pov_from_distribution(descriptor.clone()).await?;
let v = self.request_candidate_validation(descriptor, pov.clone()).await?;

let statement = match v {
ValidationResult::Valid(outputs) => {
// If validation produces a new set of commitments, we vote the candidate as invalid.
let commitments = self.make_pov_available((&*pov).clone(), outputs).await?;
if commitments != expected_commitments {
Statement::Invalid(candidate_hash)
} else {
Statement::Valid(candidate_hash)
}
}
ValidationResult::Invalid => {
Statement::Invalid(candidate_hash)
Expand All @@ -435,7 +467,7 @@ impl CandidateBackingJob {
self.distribute_signed_statement(signed_statement).await?;
}

Ok(v.0)
Ok(())
}

/// Import the statement and kick off validation work if it is a part of our assignment.
Expand Down Expand Up @@ -493,29 +525,26 @@ impl CandidateBackingJob {
async fn request_pov_from_distribution(
&mut self,
descriptor: CandidateDescriptor,
) -> Result<PoV, Error> {
) -> Result<Arc<PoV>, Error> {
let (tx, rx) = oneshot::channel();

self.tx_from.send(FromJob::PoVDistribution(
PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
)).await?;

let pov = rx.await?;
Ok((*pov).clone())
Ok(rx.await?)
}

async fn request_candidate_validation(
&mut self,
candidate: CommittedCandidateReceipt,
pov: PoV,
) -> Result<(ValidationResult, GlobalValidationSchedule, LocalValidationData), Error> {
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();

self.tx_from.send(FromJob::CandidateValidation(
CandidateValidationMessage::Validate(
self.parent,
CandidateValidationMessage::ValidateFromChainState(
candidate,
self.head_data.clone(),
pov,
tx,
)
Expand All @@ -541,12 +570,11 @@ impl CandidateBackingJob {
async fn make_pov_available(
&mut self,
pov: PoV,
global_validation: GlobalValidationSchedule,
local_validation: LocalValidationData,
) -> Result<(), Error> {
outputs: ValidationOutputs,
) -> Result<CandidateCommitments, Error> {
let omitted_validation = OmittedValidationData {
global_validation,
local_validation,
global_validation: outputs.global_validation_schedule,
local_validation: outputs.local_validation_data,
};

let available_data = AvailableData {
Expand All @@ -560,18 +588,25 @@ impl CandidateBackingJob {
)?;

let branches = erasure_coding::branches(chunks.as_ref());
let erasure_root = branches.root();

for (index, (chunk, proof)) in chunks.iter().zip(branches.map(|(proof, _)| proof)).enumerate() {
for (index, (proof, chunk)) in branches.enumerate() {
let chunk = ErasureChunk {
chunk: chunk.clone(),
chunk: chunk.to_vec(),
index: index as u32,
proof,
};

self.store_chunk(index as ValidatorIndex, chunk).await?;
}

Ok(())
Ok(CandidateCommitments {
fees: outputs.fees,
upward_messages: outputs.upward_messages,
erasure_root,
new_validation_code: outputs.new_validation_code,
head_data: outputs.head_data,
})
}

async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
Expand Down Expand Up @@ -650,13 +685,7 @@ async fn run_job(
}
}

let (
head_data,
signing_context,
) = futures::try_join!(
request_head_data(parent, &mut tx_from, assignment).await?,
request_signing_context(parent, &mut tx_from).await?,
)?;
let signing_context = request_signing_context(parent, &mut tx_from).await?.await?;

let table_context = TableContext {
signing_context,
Expand All @@ -669,7 +698,6 @@ async fn run_job(
parent,
rx_to,
tx_from,
head_data,
assignment,
issued_statements: HashSet::new(),
seconded: None,
Expand Down Expand Up @@ -729,23 +757,6 @@ async fn request_signing_context(
Ok(rx)
}

/// Request `HeadData` for some `ParaId` from `RuntimeApi`.
async fn request_head_data(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
id: ParaId,
) -> Result<oneshot::Receiver<HeadData>, Error> {
let (tx, rx) = oneshot::channel();

s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request(
parent,
RuntimeApiRequest::HeadData(id, tx),
)
)).await?;

Ok(rx)
}

impl<S: Spawn> Jobs<S> {
fn new(spawner: S) -> Self {
Self {
Expand Down Expand Up @@ -927,7 +938,7 @@ mod tests {
use sp_keyring::Sr25519Keyring;
use polkadot_primitives::v1::{
AssignmentKind, CollatorId, CoreAssignment, BlockData, CoreIndex, GroupIndex, ValidityAttestation,
CandidateCommitments,
CandidateCommitments, LocalValidationData, GlobalValidationSchedule, HeadData,
};
use assert_matches::assert_matches;

Expand Down