Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Changes from 1 commit
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
62184b4
BEGIN ASYNC candidate-backing CHANGES
rphmeier Apr 28, 2022
821ed42
rename & document modes
rphmeier Apr 29, 2022
1fc4928
answer prospective validation data requests
rphmeier May 18, 2022
39f2076
GetMinimumRelayParents request is now plural
rphmeier May 19, 2022
80af4be
implement an implicit view utility for backing subsystems
rphmeier May 19, 2022
7c58f7a
implicit-view: get allowed relay parents
rphmeier May 23, 2022
fbcedac
refactorings and improvements to implicit view
rphmeier May 23, 2022
126ed91
add some TODOs for tests
rphmeier May 23, 2022
a5994a7
split implicit view updates into 2 functions
rphmeier May 25, 2022
8944760
backing: define State to prepare for functional refactor
rphmeier May 25, 2022
c34d0e6
add some docs
rphmeier May 25, 2022
20cd422
backing: implement bones of new leaf activation logic
rphmeier May 25, 2022
2b7d883
backing: create per-relay-parent-states
rphmeier May 25, 2022
923b28a
use new handle_active_leaves_update
rphmeier May 25, 2022
b9280d1
begin extracting logic from CandidateBackingJob
rphmeier May 27, 2022
967156e
mostly extract statement import from job logic
rphmeier May 27, 2022
b25acb7
handle statement imports outside of job logic
rphmeier May 27, 2022
df95962
do some TODO planning for prospective parachains integration
rphmeier May 27, 2022
458d24d
finish rewriting backing subsystem in functional style
rphmeier May 27, 2022
22a9d5e
add prospective parachains mode to relay parent entries
rphmeier May 27, 2022
fc0c4e4
fmt
rphmeier May 27, 2022
a4df277
add a RejectedByProspectiveParachains error
rphmeier May 27, 2022
2f202d0
notify prospective parachains of seconded and backed candidates
rphmeier May 28, 2022
910b997
always validate candidates exhaustively in backing.
rphmeier May 28, 2022
19d7a43
return persisted_validation_data from validation
rphmeier May 28, 2022
7f24629
handle rejections by prospective parachains
rphmeier May 28, 2022
22ead26
implement seconding sanity check
rphmeier May 28, 2022
6738ee9
invoke validate_and_second
rphmeier May 28, 2022
c54e1ca
Alter statement table to allow multiple seconded messages per validator
rphmeier May 28, 2022
e855b6c
refactor backing to have statements carry PVD
rphmeier May 30, 2022
eb0ee29
clean up all warnings
rphmeier May 30, 2022
d80c190
Add tests for implicit view
slumber May 31, 2022
21a381b
fix per_relay_parent pruning in backing
rphmeier May 31, 2022
47a9832
BEGIN STATEMENT DISTRIBUTION WORK
rphmeier May 31, 2022
b8f5282
mostly make network bridge amenable to vstaging
rphmeier May 31, 2022
843cb7b
network-bridge: fully adapt to vstaging
rphmeier May 31, 2022
89cf386
add some TODOs for tests
rphmeier May 31, 2022
a320e44
fix fallout in bitfield-distribution
rphmeier May 31, 2022
3cc56d1
add some test TODOs
rphmeier May 31, 2022
00f410d
fix fallout in gossip-support
rphmeier May 31, 2022
1a60a0c
fmt
rphmeier May 31, 2022
6f16adf
collator-protocol: fix message fallout
rphmeier May 31, 2022
7e46d8a
collator-protocol: load PVD from runtime
rphmeier May 31, 2022
86007fb
make things compile
rphmeier Jun 2, 2022
faf00f6
fmt
rphmeier Jun 2, 2022
f43c8f6
begin extracting view logic to separate module
rphmeier Jun 2, 2022
759e1d1
create deeper submodule and add per-peer knowledge
rphmeier Jun 2, 2022
51ca0ef
add ProspectiveParachainsMessage to statement-distribution
rphmeier Jun 2, 2022
778627b
make recv_runtime public
rphmeier Jun 2, 2022
df6f7c0
add ChainApiMessage to outgoing of statement-dist
rphmeier Jun 2, 2022
9af0013
begin new handle_active_leaves_update
rphmeier Jun 2, 2022
ce6e3d8
instantiate relay-parent-info without prospective data
rphmeier Jun 2, 2022
5bdb0ec
add staging-network feature to protocol
rphmeier Jun 5, 2022
8c9158a
rename to network-protocol staging
rphmeier Jun 5, 2022
dc6ee0d
refactor view to better accomodate both protcools
rphmeier Jun 5, 2022
a45cb24
begin fleshing out with_prospective
rphmeier Jun 5, 2022
d5e8e88
extract some tests to without_prospective; comment the rest
rphmeier Jun 5, 2022
3a095e6
begin high-level View API
rphmeier Jun 5, 2022
406d87d
fmt
rphmeier Jun 5, 2022
5c8e048
Merge branch 'rh-async-backing-feature' into rh-async-backing-integra…
slumber Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
mostly extract statement import from job logic
  • Loading branch information
rphmeier committed May 27, 2022
commit 967156ed9fca047c3d2fa34d60a25fac1758c644
283 changes: 227 additions & 56 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,7 @@ async fn handle_validated_candidate_command<Context>(
// outdated - we now allow seconding multiple candidates
// per relay-parent. update it to properly defend against
// seconding stuff wrongly.
if !rp_state.issued_statements.contains(&candidate_hash)
{
if !rp_state.issued_statements.contains(&candidate_hash) {
// TODO [now]: note the candidate as seconded.
rp_state.issued_statements.insert(candidate_hash);
metrics.on_candidate_seconded();
Expand All @@ -715,26 +714,29 @@ async fn handle_validated_candidate_command<Context>(
commitments,
});


// TODO [now]:
// implement `sign_import_and_distribute_statement`
// for PerRelayParentState.
//
// if let Some(stmt) = self
// .sign_import_and_distribute_statement(ctx, statement, root_span)
// .await?
// {
// ctx.send_message(CollatorProtocolMessage::Seconded(
// rp_state.parent,
// stmt,
// ))
// .await;
// }
if let Some(stmt) = sign_import_and_distribute_statement(
ctx,
rp_state,
statement,
state.keystore.clone(),
metrics,
)
.await?
{
ctx.send_message(CollatorProtocolMessage::Seconded(
rp_state.parent,
stmt,
))
.await;
}
}
},
Err(candidate) => {
ctx.send_message(CollatorProtocolMessage::Invalid(rp_state.parent, candidate))
.await;
ctx.send_message(CollatorProtocolMessage::Invalid(
rp_state.parent,
candidate,
))
.await;
},
},
ValidatedCandidateCommand::Attest(res) => {
Expand All @@ -745,13 +747,18 @@ async fn handle_validated_candidate_command<Context>(
if res.is_ok() {
let statement = Statement::Valid(candidate_hash);

// TODO [now]: needs implementation of `sign_import_and_distribute`
// self.sign_import_and_distribute_statement(ctx, statement, &root_span)
// .await?;
sign_import_and_distribute_statement(
ctx,
rp_state,
statement,
state.keystore.clone(),
metrics,
)
.await?;
}
rp_state.issued_statements.insert(candidate_hash);
}
}
},
ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
if let Some((attesting, span)) = rp_state.fallbacks.get_mut(&candidate_hash) {
if let Some(index) = attesting.backing.pop() {
Expand All @@ -770,18 +777,212 @@ async fn handle_validated_candidate_command<Context>(
);
debug_assert!(false);
}
}
},
}
}
},
None => {
// simple race condition; can be ignored = this relay-parent
// is no longer relevant.
}
},
}

Ok(())
}

async fn sign_statement(
rp_state: &PerRelayParentState,
statement: Statement,
keystore: SyncCryptoStorePtr,
metrics: &Metrics,
) -> Option<SignedFullStatement> {
let signed = rp_state
.table_context
.validator
.as_ref()?
.sign(keystore, statement)
.await
.ok()
.flatten()?;
metrics.on_statement_signed();
Some(signed)
}

/// The dispute coordinator keeps track of all statements by validators about every recent
/// candidate.
///
/// When importing a statement, this should be called access the candidate receipt either
/// from the statement itself or from the underlying statement table in order to craft
/// and dispatch the notification to the dispute coordinator.
///
/// This also does bounds-checking on the validator index and will return an error if the
/// validator index is out of bounds for the current validator set. It's expected that
/// this should never happen due to the interface of the candidate backing subsystem -
/// the networking component responsible for feeding statements to the backing subsystem
/// is meant to check the signature and provenance of all statements before submission.
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn dispatch_new_statement_to_dispute_coordinator<Context>(
ctx: &mut Context,
rp_state: &PerRelayParentState,
candidate_hash: CandidateHash,
statement: &SignedFullStatement,
) -> Result<(), ValidatorIndexOutOfBounds> {
// Dispatch the statement to the dispute coordinator.
let validator_index = statement.validator_index();
let signing_context =
SigningContext { parent_hash: rp_state.parent, session_index: rp_state.session_index };

let validator_public = match rp_state.table_context.validators.get(validator_index.0 as usize) {
None => return Err(ValidatorIndexOutOfBounds),
Some(v) => v,
};

let maybe_candidate_receipt = match statement.payload() {
Statement::Seconded(receipt) => Some(receipt.to_plain()),
Statement::Valid(candidate_hash) => {
// Valid statements are only supposed to be imported
// once we've seen at least one `Seconded` statement.
rp_state.table.get_candidate(&candidate_hash).map(|c| c.to_plain())
},
};

let maybe_signed_dispute_statement = SignedDisputeStatement::from_backing_statement(
statement.as_unchecked(),
signing_context,
validator_public.clone(),
)
.ok();

if let (Some(candidate_receipt), Some(dispute_statement)) =
(maybe_candidate_receipt, maybe_signed_dispute_statement)
{
ctx.send_message(DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt,
session: rp_state.session_index,
statements: vec![(dispute_statement, validator_index)],
pending_confirmation: None,
})
.await;
}

Ok(())
}

/// Import a statement into the statement table and return the summary of the import.
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn import_statement<Context>(
ctx: &mut Context,
rp_state: &mut PerRelayParentState,
statement: &SignedFullStatement,
) -> Result<Option<TableSummary>, Error> {
gum::debug!(
target: LOG_TARGET,
statement = ?statement.payload().to_compact(),
validator_index = statement.validator_index().0,
"Importing statement",
);

let candidate_hash = statement.payload().candidate_hash();

if let Err(ValidatorIndexOutOfBounds) =
dispatch_new_statement_to_dispute_coordinator(ctx, rp_state, candidate_hash, &statement)
.await
{
gum::warn!(
target: LOG_TARGET,
session_index = ?rp_state.session_index,
relay_parent = ?rp_state.parent,
validator_index = statement.validator_index().0,
"Supposedly 'Signed' statement has validator index out of bounds."
);

return Ok(None)
}

let stmt = primitive_statement_to_table(statement);

// TODO [now]: we violate the pre-existing checks that each validator may
// only second one candidate.
//
// We will need to address this so we don't get errors incorrectly.
let summary = rp_state.table.import_statement(&rp_state.table_context, stmt);

if let Some(attested) = summary
.as_ref()
.and_then(|s| rp_state.table.attested_candidate(&s.candidate, &rp_state.table_context))
{
let candidate_hash = attested.candidate.hash();
// `HashSet::insert` returns true if the thing wasn't in there already.
if rp_state.backed.insert(candidate_hash) {
if let Some(backed) = table_attested_to_backed(attested, &rp_state.table_context) {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
relay_parent = ?rp_state.parent,
para_id = %backed.candidate.descriptor.para_id,
"Candidate backed",
);

// The provisioner waits on candidate-backing, which means
// that we need to send unbounded messages to avoid cycles.
//
// Backed candidates are bounded by the number of validators,
// parachains, and the block production rate of the relay chain.
let message = ProvisionerMessage::ProvisionableData(
rp_state.parent,
ProvisionableData::BackedCandidate(backed.receipt()),
);
ctx.send_unbounded_message(message);
}
}
}

issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);

Ok(summary)
}

/// Check if there have happened any new misbehaviors and issue necessary messages.
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
fn issue_new_misbehaviors<Context>(
ctx: &mut Context,
relay_parent: Hash,
table: &mut Table<TableContext>,
) {
// collect the misbehaviors to avoid double mutable self borrow issues
let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
for (validator_id, report) in misbehaviors {
// The provisioner waits on candidate-backing, which means
// that we need to send unbounded messages to avoid cycles.
//
// Misbehaviors are bounded by the number of validators and
// the block production protocol.
ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
relay_parent,
ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
));
}
}

#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn sign_import_and_distribute_statement<Context>(
ctx: &mut Context,
rp_state: &mut PerRelayParentState,
statement: Statement,
keystore: SyncCryptoStorePtr,
metrics: &Metrics,
) -> Result<Option<SignedFullStatement>, Error> {
if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics).await {
import_statement(ctx, rp_state, &signed_statement).await?;
let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone());
ctx.send_unbounded_message(smsg);

Ok(Some(signed_statement))
} else {
Ok(None)
}
}

struct JobAndSpan<Context> {
job: CandidateBackingJob<Context>,
span: PerLeafSpan,
Expand Down Expand Up @@ -1277,23 +1478,6 @@ impl<Context> CandidateBackingJob<Context> {
Ok(())
}

async fn sign_import_and_distribute_statement(
&mut self,
ctx: &mut Context,
statement: Statement,
root_span: &jaeger::Span,
) -> Result<Option<SignedFullStatement>, Error> {
if let Some(signed_statement) = self.sign_statement(statement).await {
self.import_statement(ctx, &signed_statement, root_span).await?;
let smsg = StatementDistributionMessage::Share(self.parent, signed_statement.clone());
ctx.send_unbounded_message(smsg);

Ok(Some(signed_statement))
} else {
Ok(None)
}
}

/// Check if there have happened any new misbehaviors and issue necessary messages.
fn issue_new_misbehaviors(&mut self, sender: &mut impl overseer::CandidateBackingSenderTrait) {
// collect the misbehaviors to avoid double mutable self borrow issues
Expand Down Expand Up @@ -1664,19 +1848,6 @@ impl<Context> CandidateBackingJob<Context> {
Ok(())
}

async fn sign_statement(&mut self, statement: Statement) -> Option<SignedFullStatement> {
let signed = self
.table_context
.validator
.as_ref()?
.sign(self.keystore.clone(), statement)
.await
.ok()
.flatten()?;
self.metrics.on_statement_signed();
Some(signed)
}

/// Insert or get the unbacked-span for the given candidate hash.
fn insert_or_get_unbacked_span(
&mut self,
Expand Down