Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions node/network/availability-distribution/src/requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,13 @@ impl Requester {
{
tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
let ActiveLeavesUpdate { activated, deactivated } = update;
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
if let Some(activated) = activated {
// Stale leaves happen after a reversion - we don't want to re-run availability there.
if let LeafStatus::Fresh = activated.status {
self.start_requesting_chunks(ctx, runtime, activated).await?;
}
// Stale leaves happen after a reversion - we don't want to re-run availability there.
if let Some(leaf) = activated.filter(|leaf| leaf.status == LeafStatus::Fresh) {
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
self.start_requesting_chunks(ctx, runtime, leaf).await?;
}

self.stop_requesting_chunks(deactivated.into_iter());
Ok(())
}
Expand Down Expand Up @@ -207,16 +206,21 @@ impl Requester {
let tx = self.tx.clone();
let metrics = self.metrics.clone();

// We need to run the fetching task in the context of the leaf session,
// so we need to pass in the parent of the leaf to `with_session_info()`.
let leaf_parent =
get_block_ancestors(ctx, leaf, 1).await?.into_iter().next().unwrap_or(leaf);
let task_cfg = self
.session_cache
.with_session_info(
ctx,
runtime,
// We use leaf here, as relay_parent must be in the same session as the
// leaf. (Cores are dropped at session boundaries.) At the same time,
// only leaves are guaranteed to be fetchable by the state trie.
leaf,
|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info),
// We need to use the leaf parent here, such that we fetch the correct backing group
// when the candidate was backed at the session boudnary.
// The state trie still contains the block as the chances of this parent to be
// already finalized are negligible.
leaf_parent,
|info| FetchTaskConfig::new(leaf_parent, &core, tx, metrics, info),
)
.await?;

Expand Down Expand Up @@ -274,7 +278,7 @@ where

// `head` is the child of the first block in `ancestors`, request its session index.
let head_session_index = match ancestors_iter.next() {
Some(parent) => runtime.get_session_index(ctx.sender(), *parent).await?,
Some(parent) => runtime.get_session_index_for_child(ctx.sender(), *parent).await?,
None => {
// No first element, i.e. empty.
return Ok(ancestors)
Expand All @@ -285,7 +289,7 @@ where
// The first parent is skipped.
for parent in ancestors_iter {
// Parent is the i-th ancestor, request session index for its child -- (i-1)th element.
let session_index = runtime.get_session_index(ctx.sender(), *parent).await?;
let session_index = runtime.get_session_index_for_child(ctx.sender(), *parent).await?;
if session_index == head_session_index {
session_ancestry_len += 1;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl SessionCache {
Context: SubsystemContext,
F: FnOnce(&SessionInfo) -> R,
{
let session_index = runtime.get_session_index(ctx.sender(), parent).await?;
let session_index = runtime.get_session_index_for_child(ctx.sender(), parent).await?;

if let Some(o_info) = self.session_info_cache.get(&session_index) {
tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
Expand Down Expand Up @@ -177,13 +177,15 @@ impl SessionCache {
&self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
parent: Hash,
relay_parent: Hash,
session_index: SessionIndex,
) -> Result<Option<SessionInfo>, Error>
where
Context: SubsystemContext,
{
let info = runtime.get_session_info_by_index(ctx.sender(), parent, session_index).await?;
let info = runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?;

let discovery_keys = info.session_info.discovery_keys.clone();
let mut validator_groups = info.session_info.validator_groups.clone();
Expand Down
2 changes: 1 addition & 1 deletion node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
let session_index = runtime.get_session_index(ctx.sender(), relay_parent).await?;
let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
let info = &runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?
Expand Down
29 changes: 24 additions & 5 deletions node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,26 @@ impl DisputeSender {
dispute: (SessionIndex, CandidateHash),
) -> Result<()> {
let (session_index, candidate_hash) = dispute;
// We need some relay chain head for context for receiving session info information:
let ref_head = self.active_sessions.values().next().ok_or(NonFatal::NoActiveHeads)?;
// A relay chain head is required as context for receiving session info information from runtime and
// storage. We will iterate `active_sessions` to find a suitable head. We assume that there is at
// least one active head which, by `session_index`, is at least as recent as the `dispute` passed in.
// We need to avoid picking an older one from a session that might not yet exist in storage.
// Related to <https://github.com/paritytech/polkadot/issues/4730> .
let ref_head = self
.active_sessions
.iter()
.find_map(|(active_session_index, head_hash)| {
// There might be more than one session index that is at least as recent as the dispute
// so we just pick the first one. Keep in mind we are talking about the session index for the
// child of block identified by `head_hash` and not the session index for the block.
if active_session_index >= &session_index {
Some(head_hash)
} else {
None
}
})
.ok_or(NonFatal::NoActiveHeads)?;

let info = runtime
.get_session_info_by_index(ctx.sender(), *ref_head, session_index)
.await?;
Expand Down Expand Up @@ -293,7 +311,7 @@ impl DisputeSender {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
) -> Result<bool> {
let new_sessions = get_active_session_indeces(ctx, runtime, &self.active_heads).await?;
let new_sessions = get_active_session_indices(ctx, runtime, &self.active_heads).await?;
let new_sessions_raw: HashSet<_> = new_sessions.keys().collect();
let old_sessions_raw: HashSet<_> = self.active_sessions.keys().collect();
let updated = new_sessions_raw != old_sessions_raw;
Expand All @@ -306,14 +324,15 @@ impl DisputeSender {
/// Retrieve the currently active sessions.
///
/// List is all indices of all active sessions together with the head that was used for the query.
async fn get_active_session_indeces<Context: SubsystemContext>(
async fn get_active_session_indices<Context: SubsystemContext>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_heads: &Vec<Hash>,
) -> Result<HashMap<SessionIndex, Hash>> {
let mut indeces = HashMap::new();
// Iterate all heads we track as active and fetch the child' session indices.
for head in active_heads {
let session_index = runtime.get_session_index(ctx.sender(), *head).await?;
let session_index = runtime.get_session_index_for_child(ctx.sender(), *head).await?;
indeces.insert(session_index, *head);
}
Ok(indeces)
Expand Down
6 changes: 4 additions & 2 deletions node/network/dispute-distribution/src/sender/send_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ impl SendTask {
active_sessions: &HashMap<SessionIndex, Hash>,
) -> Result<HashSet<AuthorityDiscoveryId>> {
let ref_head = self.request.0.candidate_receipt.descriptor.relay_parent;
// Parachain validators:
// Retrieve all authorities which participated in the parachain consensus of the session
// in which the candidate was backed.
let info = runtime
.get_session_info_by_index(ctx.sender(), ref_head, self.request.0.session_index)
.await?;
Expand All @@ -219,7 +220,8 @@ impl SendTask {
.map(|(_, v)| v.clone())
.collect();

// Current authorities:
// Retrieve all authorities for the current session as indicated by the active
// heads we are tracking.
for (session_index, head) in active_sessions.iter() {
let info =
runtime.get_session_info_by_index(ctx.sender(), *head, *session_index).await?;
Expand Down
7 changes: 4 additions & 3 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,9 @@ struct ActiveHeadData {
statements: IndexMap<StoredStatementComparator, SignedFullStatement>,
/// Large statements we are waiting for with associated meta data.
waiting_large_statements: HashMap<CandidateHash, LargeStatementStatus>,
/// The validators at this head.
/// The parachain validators at the head's child session index.
validators: Vec<ValidatorId>,
/// The session index this head is at.
/// The current session index of this fork.
session_index: sp_staking::SessionIndex,
/// How many `Seconded` statements we've seen per validator.
seconded_counts: HashMap<ValidatorIndex, usize>,
Expand Down Expand Up @@ -1798,8 +1798,9 @@ impl StatementDistributionSubsystem {
"New active leaf",
);

// Retrieve the parachain validators at the child of the head we track.
let session_index =
runtime.get_session_index(ctx.sender(), relay_parent).await?;
runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
let info = runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion node/subsystem-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use polkadot_node_jaeger as jaeger;
const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8;

/// The status of an activated leaf.
#[derive(Debug, Clone, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub enum LeafStatus {
/// A leaf is fresh when it's the first time the leaf has been encountered.
/// Most leaves should be fresh.
Expand Down
11 changes: 8 additions & 3 deletions node/subsystem-util/src/rolling_session_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl RollingSessionWindow {
window_size: SessionWindowSize,
block_hash: Hash,
) -> Result<Self, SessionsUnavailable> {
let session_index = get_session_index_for_head(ctx, block_hash).await?;
let session_index = get_session_index_for_child(ctx, block_hash).await?;

let window_start = session_index.saturating_sub(window_size.get() - 1);

Expand Down Expand Up @@ -160,7 +160,7 @@ impl RollingSessionWindow {
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
) -> Result<SessionWindowUpdate, SessionsUnavailable> {
let session_index = get_session_index_for_head(ctx, block_hash).await?;
let session_index = get_session_index_for_child(ctx, block_hash).await?;

let old_window_start = self.earliest_session;

Expand Down Expand Up @@ -212,7 +212,12 @@ impl RollingSessionWindow {
}
}

async fn get_session_index_for_head(
// Returns the session index expected at any child of the `parent` block.
//
// Note: We could use `RuntimeInfo::get_session_index_for_child` here but it's
// cleaner to just call the runtime API directly without needing to create an instance
// of `RuntimeInfo`.
async fn get_session_index_for_child(
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
) -> Result<SessionIndex, SessionsUnavailable> {
Expand Down
19 changes: 10 additions & 9 deletions node/subsystem-util/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ impl RuntimeInfo {
}
}

/// Retrieve the current session index.
pub async fn get_session_index<Sender>(
/// Returns the session index expected at any child of the `parent` block.
/// This does not return the session index for the `parent` block.
pub async fn get_session_index_for_child<Sender>(
&mut self,
sender: &mut Sender,
parent: Hash,
Expand All @@ -141,14 +142,14 @@ impl RuntimeInfo {
pub async fn get_session_info<'a, Sender>(
&'a mut self,
sender: &mut Sender,
parent: Hash,
relay_parent: Hash,
) -> Result<&'a ExtendedSessionInfo>
where
Sender: SubsystemSender,
{
let session_index = self.get_session_index(sender, parent).await?;
let session_index = self.get_session_index_for_child(sender, relay_parent).await?;

self.get_session_info_by_index(sender, parent, session_index).await
self.get_session_info_by_index(sender, relay_parent, session_index).await
}

/// Get `ExtendedSessionInfo` by session index.
Expand Down Expand Up @@ -185,7 +186,7 @@ impl RuntimeInfo {
pub async fn check_signature<Sender, Payload, RealPayload>(
&mut self,
sender: &mut Sender,
parent: Hash,
relay_parent: Hash,
signed: UncheckedSigned<Payload, RealPayload>,
) -> Result<
std::result::Result<Signed<Payload, RealPayload>, UncheckedSigned<Payload, RealPayload>>,
Expand All @@ -195,9 +196,9 @@ impl RuntimeInfo {
Payload: EncodeAs<RealPayload> + Clone,
RealPayload: Encode + Clone,
{
let session_index = self.get_session_index(sender, parent).await?;
let info = self.get_session_info_by_index(sender, parent, session_index).await?;
Ok(check_signature(session_index, &info.session_info, parent, signed))
let session_index = self.get_session_index_for_child(sender, relay_parent).await?;
let info = self.get_session_info_by_index(sender, relay_parent, session_index).await?;
Ok(check_signature(session_index, &info.session_info, relay_parent, signed))
}

/// Build `ValidatorInfo` for the current session.
Expand Down