Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
df828ac
Indentation fix.
eskimor Mar 18, 2021
06d4d90
Prepare request-response for PoV fetching.
eskimor Mar 18, 2021
6a940eb
Drop old PoV distribution.
eskimor Mar 18, 2021
a7fc368
WIP: Fetch PoV directly from backing.
eskimor Mar 18, 2021
9847b81
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 19, 2021
e03ff75
Backing compiles.
eskimor Mar 19, 2021
a49b4d4
Runtime access and connection management for PoV distribution.
eskimor Mar 23, 2021
545e950
Get rid of seemingly dead code.
eskimor Mar 23, 2021
47d9f5f
Implement PoV fetching.
eskimor Mar 23, 2021
0a283ab
Don't send `ConnectToValidators` for empty list.
eskimor Mar 24, 2021
afd795f
Even better - no need to check over and over again.
eskimor Mar 24, 2021
1c3eec8
PoV fetching implemented.
eskimor Mar 24, 2021
fb24855
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 24, 2021
cceddce
Check PoV hash upon reception.
eskimor Mar 24, 2021
89f0bf9
Implement retry of PoV fetching in backing.
eskimor Mar 25, 2021
ab75fea
Avoid pointless validation spawning.
eskimor Mar 25, 2021
10da891
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 25, 2021
3915a57
Add jaeger span to pov requesting.
eskimor Mar 25, 2021
fa6409e
Add back tracing.
eskimor Mar 25, 2021
8b9c2d4
Review remarks.
eskimor Mar 25, 2021
2d27be5
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 25, 2021
4af7d2e
Whitespace.
eskimor Mar 25, 2021
5c09829
Whitespace again.
eskimor Mar 26, 2021
ea9bde4
Cleanup + fix tests.
eskimor Mar 27, 2021
4207eaf
Log to log target in overseer.
eskimor Mar 27, 2021
3691061
Fix more tests.
eskimor Mar 27, 2021
b1a201a
Don't fail if group cannot be found.
eskimor Mar 27, 2021
298fe9d
Simple test for PoV fetcher.
eskimor Mar 27, 2021
af9f12c
Handle missing group membership better.
eskimor Mar 27, 2021
0c30792
Add test for retry functionality.
eskimor Mar 27, 2021
eb47465
Fix flaky test.
eskimor Mar 27, 2021
071bcca
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 27, 2021
3fa5791
Spaces again.
eskimor Mar 28, 2021
82d4a11
Guide updates.
eskimor Mar 28, 2021
b58a2ab
Merge branch 'master' into rk-pov-distribution-2590
eskimor Mar 28, 2021
a0609e7
Spaces.
eskimor Mar 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement retry of PoV fetching in backing.
  • Loading branch information
eskimor committed Mar 25, 2021
commit 89f0bf97a35ba4c97a0ab02679cb53d0706c86cf
147 changes: 112 additions & 35 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ enum PoVData {
FetchFromValidator {
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
pov_hash:Hash,
pov_hash: Hash,
},
}

Expand All @@ -111,6 +111,8 @@ enum ValidatedCandidateCommand {
Second(BackgroundValidationResult),
// We were instructed to validate the candidate.
Attest(BackgroundValidationResult),
// We were not able to `Attest` because backing validator did not send us the PoV.
AttestNoPoV(CandidateHash),
}

impl std::fmt::Debug for ValidatedCandidateCommand {
Expand All @@ -121,6 +123,8 @@ impl std::fmt::Debug for ValidatedCandidateCommand {
write!(f, "Second({})", candidate_hash),
ValidatedCandidateCommand::Attest(_) =>
write!(f, "Attest({})", candidate_hash),
ValidatedCandidateCommand::AttestNoPoV(_) =>
write!(f, "Attest({})", candidate_hash),
}
}
}
Expand All @@ -132,6 +136,7 @@ impl ValidatedCandidateCommand {
ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
ValidatedCandidateCommand::Attest(Ok((ref candidate, _, _))) => candidate.hash(),
ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
}
}
}
Expand All @@ -152,6 +157,8 @@ struct CandidateBackingJob {
issued_statements: HashSet<CandidateHash>,
/// These candidates are undergoing validation in the background.
awaiting_validation: HashSet<CandidateHash>,
/// Data needed for retrying in case of `ValidatedCandidateCommand::AttestNoPoV`.
fallbacks: HashMap<CandidateHash, (AttestingData, Option<jaeger::Span>)>,
/// `Some(h)` if this job has already issued `Seconded` statement for some candidate with `h` hash.
seconded: Option<CandidateHash>,
/// The candidates that are includable, by hash. Each entry here indicates
Expand All @@ -165,6 +172,23 @@ struct CandidateBackingJob {
metrics: Metrics,
}

/// In case a backing validator does not provide a PoV, we need to retry with other backing
/// validators.
///
/// This is the data needed to accomplish this. Basically all the data needed for spawning a
/// validation job and a list of backing validators, we can try.
#[derive(Clone)]
struct AttestingData {
/// The candidate to attest.
candidate: CandidateReceipt,
/// Hash of the PoV we need to fetch.
pov_hash: Hash,
/// Validator we are currently trying to get the PoV from.
from_validator: ValidatorIndex,
/// Other backing validators we can try in case `from_validator` failed.
backing: Vec<ValidatorIndex>,
}

const fn group_quorum(n_validators: usize) -> usize {
(n_validators / 2) + 1
}
Expand Down Expand Up @@ -363,7 +387,7 @@ async fn request_pov(
from_validator,
candidate_hash,
pov_hash,
tx,
tx,
}
))).await?;

Expand Down Expand Up @@ -431,13 +455,20 @@ async fn validate_and_make_available(
pov_hash,
} => {
let _span = span.as_ref().map(|s| s.child("request-pov"));
request_pov(
&mut tx_from,
relay_parent,
from_validator,
candidate_hash,
pov_hash,
).await?
match request_pov(
&mut tx_from,
relay_parent,
from_validator,
candidate_hash,
pov_hash,
).await {
Err(Error::FetchPoV) => {
tx_command.send(ValidatedCandidateCommand::AttestNoPoV(candidate.hash())).await.map_err(Error::Mpsc)?;
return Ok(())
}
Err(err) => return Err(err),
Ok(pov) => pov,
}
}
};

Expand Down Expand Up @@ -587,6 +618,24 @@ impl CandidateBackingJob {
self.issued_statements.insert(candidate_hash);
}
}
ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
if let Some((attesting, span)) = self.fallbacks.get_mut(&candidate_hash) {
if let Some(index) = attesting.backing.pop() {
attesting.from_validator = index;
// Ok, another try:
let c_span = span.as_ref().map(|s| s.child("try"));
let attesting = attesting.clone();
self.kick_off_validation_work(attesting, c_span).await?
}

} else {
tracing::warn!(
target: LOG_TARGET,
"AttestNoPoV was triggered without fallback being available."
);
debug_assert!(false);
}
}
}

Ok(())
Expand Down Expand Up @@ -843,29 +892,23 @@ impl CandidateBackingJob {
}

/// Kick off validation work and distribute the result as a signed statement.
#[tracing::instrument(level = "trace", skip(self, pov, span), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, attesting, span), fields(subsystem = LOG_TARGET))]
async fn kick_off_validation_work(
&mut self,
summary: TableSummary,
pov: PoVData,
attesting: AttestingData,
span: Option<jaeger::Span>,
) -> Result<(), Error> {
let candidate_hash = summary.candidate;

let candidate_hash = attesting.candidate.hash();
if self.issued_statements.contains(&candidate_hash) {
return Ok(())
}

// We clone the commitments here because there are borrowck
// errors relating to this being a struct and methods borrowing the entirety of self
// and not just those things that the function uses.
let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?.to_plain();
let descriptor = candidate.descriptor().clone();
let descriptor = attesting.candidate.descriptor().clone();

tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
candidate_receipt = ?candidate,
candidate_receipt = ?attesting.candidate,
"Kicking off validation",
);

Expand All @@ -881,10 +924,16 @@ impl CandidateBackingJob {
return Ok(());
}

let pov = PoVData::FetchFromValidator {
from_validator: attesting.from_validator,
candidate_hash,
pov_hash: attesting.pov_hash,
};

self.background_validate_and_make_available(BackgroundValidationParams {
tx_from: self.tx_from.clone(),
tx_command: self.background_validation_tx.clone(),
candidate,
candidate: attesting.candidate,
relay_parent: self.parent,
pov,
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
Expand All @@ -903,30 +952,57 @@ impl CandidateBackingJob {
statement: SignedFullStatement,
) -> Result<(), Error> {
if let Some(summary) = self.import_statement(&statement, parent_span).await? {
if let Statement::Seconded(receipt) = statement.payload() {
if Some(summary.group_id) == self.assignment {
if Some(summary.group_id) != self.assignment {
return Ok(())
}
let (attesting, span) = match statement.payload() {
Statement::Seconded(receipt) => {
let candidate_hash = summary.candidate;

let span = self.get_unbacked_validation_child(
root_span,
summary.candidate,
summary.group_id,
);
let pov_hash = receipt.descriptor.pov_hash;
let candidate_hash = summary.candidate;
let pov_data = PoVData::FetchFromValidator {

let attesting = AttestingData {
candidate: self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?.to_plain(),
pov_hash: receipt.descriptor.pov_hash,
from_validator: statement.validator_index(),
candidate_hash,
pov_hash
backing: Vec::new(),
};
let child = span.as_ref().map(|s| s.child("try"));
self.fallbacks.insert(summary.candidate, (attesting.clone(), span));
(attesting, child)
}
Statement::Valid(candidate_hash) => {
if let Some((attesting, span)) = self.fallbacks.get_mut(candidate_hash) {

let our_index = self.table_context.validator.as_ref().map(|v| v.index());
if our_index == Some(statement.validator_index()) {
return Ok(())
}

self.kick_off_validation_work(
summary,
pov_data,
span,
).await?;
if self.awaiting_validation.contains(candidate_hash) {
// Job already running:
attesting.backing.push(statement.validator_index());
return Ok(())
} else {
// No job, so start another try with current validator:
attesting.from_validator = statement.validator_index();
(attesting.clone(), span.as_ref().map(|s| s.child("try")))
}
} else {
return Ok(())
}
}
}
}
};

self.kick_off_validation_work(
attesting,
span,
).await?;
}
Ok(())
}

Expand Down Expand Up @@ -1159,6 +1235,7 @@ impl util::JobTrait for CandidateBackingJob {
required_collator,
issued_statements: HashSet::new(),
awaiting_validation: HashSet::new(),
fallbacks: HashMap::new(),
seconded: None,
unbacked_candidates: HashMap::new(),
backed: HashSet::new(),
Expand Down