Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions node/network/availability-distribution/src/requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,36 +98,36 @@ impl Requester {
tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
let ActiveLeavesUpdate { activated, deactivated } = update;
// Stale leaves happen after a reversion - we don't want to re-run availability there.
let activated = activated.and_then(|h| match h.status {
if let Some(leaf) = activated.and_then(|h| match h.status {
LeafStatus::Stale => None,
LeafStatus::Fresh => Some(h),
});
}) {
self.start_requesting_chunks(ctx, runtime, leaf).await?;
}

// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?;
self.stop_requesting_chunks(deactivated.into_iter());
Ok(())
}

/// Start requesting chunks for newly imported heads.
/// Start requesting chunks for newly imported relay chain head.
async fn start_requesting_chunks<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
new_heads: impl Iterator<Item = ActivatedLeaf>,
leaf: ActivatedLeaf,
) -> super::Result<()>
where
Context: SubsystemContext,
{
for ActivatedLeaf { hash: leaf, .. } in new_heads {
let cores = get_occupied_cores(ctx, leaf).await?;
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
self.add_cores(ctx, runtime, leaf, cores).await?;
}
let cores = get_occupied_cores(ctx, leaf.hash).await?;
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
self.add_cores(ctx, runtime, leaf.hash, cores).await?;
Ok(())
}

Expand Down Expand Up @@ -175,8 +175,9 @@ impl Requester {
ctx,
runtime,
// We use leaf here, as relay_parent must be in the same session as the
// leaf. (Cores are dropped at session boundaries.) At the same time,
// only leaves are guaranteed to be fetchable by the state trie.
// leaf. This is guaranteed by runtime which ensures that cores are cleared
// at session boundaries. At the same time, only leaves are guaranteed to
// be fetchable by the state trie.
leaf,
|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl SessionCache {
Context: SubsystemContext,
F: FnOnce(&SessionInfo) -> R,
{
let session_index = runtime.get_session_index(ctx.sender(), parent).await?;
let session_index = runtime.get_session_index_for_child(ctx.sender(), parent).await?;

if let Some(o_info) = self.session_info_cache.get(&session_index) {
tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
Expand Down Expand Up @@ -177,13 +177,15 @@ impl SessionCache {
&self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
parent: Hash,
relay_parent: Hash,
session_index: SessionIndex,
) -> Result<Option<SessionInfo>, Error>
where
Context: SubsystemContext,
{
let info = runtime.get_session_info_by_index(ctx.sender(), parent, session_index).await?;
let info = runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?;

let discovery_keys = info.session_info.discovery_keys.clone();
let mut validator_groups = info.session_info.validator_groups.clone();
Expand Down
2 changes: 1 addition & 1 deletion node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
let session_index = runtime.get_session_index(ctx.sender(), relay_parent).await?;
let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
let info = &runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?
Expand Down
27 changes: 22 additions & 5 deletions node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,24 @@ impl DisputeSender {
dispute: (SessionIndex, CandidateHash),
) -> Result<()> {
let (session_index, candidate_hash) = dispute;
// We need some relay chain head for context for receiving session info information:
let ref_head = self.active_sessions.values().next().ok_or(NonFatal::NoActiveHeads)?;
// A relay chain head is required as context for receiving session info information from runtime and
// storage. We will iterate `active_sessions` to find a suitable head. We assume that there is at
// least one active head which, by `session_index`, is at least as recent as the `dispute` passed in.
// We need to avoid picking an older one from a session that might not yet exist in storage.
// Related to https://github.com/paritytech/polkadot/issues/4730.
let ref_head = self
.active_sessions
.iter()
.find_map(|(active_session_index, head_hash)| {
// Pick the first one that is at least as recent as the dispute.
if active_session_index >= &session_index {
Some(head_hash)
} else {
None
}
})
.ok_or(NonFatal::NoActiveHeads)?;

let info = runtime
.get_session_info_by_index(ctx.sender(), *ref_head, session_index)
.await?;
Expand Down Expand Up @@ -293,7 +309,7 @@ impl DisputeSender {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
) -> Result<bool> {
let new_sessions = get_active_session_indeces(ctx, runtime, &self.active_heads).await?;
let new_sessions = get_active_session_indices(ctx, runtime, &self.active_heads).await?;
let new_sessions_raw: HashSet<_> = new_sessions.keys().collect();
let old_sessions_raw: HashSet<_> = self.active_sessions.keys().collect();
let updated = new_sessions_raw != old_sessions_raw;
Expand All @@ -306,14 +322,15 @@ impl DisputeSender {
/// Retrieve the currently active sessions.
///
/// List is all indices of all active sessions together with the head that was used for the query.
async fn get_active_session_indeces<Context: SubsystemContext>(
async fn get_active_session_indices<Context: SubsystemContext>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_heads: &Vec<Hash>,
) -> Result<HashMap<SessionIndex, Hash>> {
let mut indeces = HashMap::new();
// Iterate all heads we track as active and fetch the child' session indices.
for head in active_heads {
let session_index = runtime.get_session_index(ctx.sender(), *head).await?;
let session_index = runtime.get_session_index_for_child(ctx.sender(), *head).await?;
indeces.insert(session_index, *head);
}
Ok(indeces)
Expand Down
6 changes: 4 additions & 2 deletions node/network/dispute-distribution/src/sender/send_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ impl SendTask {
active_sessions: &HashMap<SessionIndex, Hash>,
) -> Result<HashSet<AuthorityDiscoveryId>> {
let ref_head = self.request.0.candidate_receipt.descriptor.relay_parent;
// Parachain validators:
// Retrieve all authorities which participated in the parachain consensus of the session
// in which the candidate was backed.
let info = runtime
.get_session_info_by_index(ctx.sender(), ref_head, self.request.0.session_index)
.await?;
Expand All @@ -219,7 +220,8 @@ impl SendTask {
.map(|(_, v)| v.clone())
.collect();

// Current authorities:
// Retrieve all authorities for the current session as indicated by the active
// heads we are tracking.
for (session_index, head) in active_sessions.iter() {
let info =
runtime.get_session_info_by_index(ctx.sender(), *head, *session_index).await?;
Expand Down
7 changes: 4 additions & 3 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,9 @@ struct ActiveHeadData {
statements: IndexMap<StoredStatementComparator, SignedFullStatement>,
/// Large statements we are waiting for with associated meta data.
waiting_large_statements: HashMap<CandidateHash, LargeStatementStatus>,
/// The validators at this head.
/// The parachain validators at the head's child session index.
validators: Vec<ValidatorId>,
/// The session index this head is at.
/// The session index this head's child is at.
session_index: sp_staking::SessionIndex,
/// How many `Seconded` statements we've seen per validator.
seconded_counts: HashMap<ValidatorIndex, usize>,
Expand Down Expand Up @@ -1798,8 +1798,9 @@ impl StatementDistributionSubsystem {
"New active leaf",
);

// Retrieve the parachain validators at the child of the head we track.
let session_index =
runtime.get_session_index(ctx.sender(), relay_parent).await?;
runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
let info = runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?;
Expand Down
11 changes: 8 additions & 3 deletions node/subsystem-util/src/rolling_session_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl RollingSessionWindow {
window_size: SessionWindowSize,
block_hash: Hash,
) -> Result<Self, SessionsUnavailable> {
let session_index = get_session_index_for_head(ctx, block_hash).await?;
let session_index = get_session_index_for_child(ctx, block_hash).await?;

let window_start = session_index.saturating_sub(window_size.get() - 1);

Expand Down Expand Up @@ -160,7 +160,7 @@ impl RollingSessionWindow {
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
) -> Result<SessionWindowUpdate, SessionsUnavailable> {
let session_index = get_session_index_for_head(ctx, block_hash).await?;
let session_index = get_session_index_for_child(ctx, block_hash).await?;

let old_window_start = self.earliest_session;

Expand Down Expand Up @@ -212,7 +212,12 @@ impl RollingSessionWindow {
}
}

async fn get_session_index_for_head(
// Returns the session index expected at any child of the `parent` block.
//
// Note: This internal `fn` is a duplicate of the one exported by `subsystem-util`,
// that is because `RollingSessionWindow` already implements it's own caching mechanism.
// It would not make sense to have 2 layers of caching.
async fn get_session_index_for_child(
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
) -> Result<SessionIndex, SessionsUnavailable> {
Expand Down
19 changes: 10 additions & 9 deletions node/subsystem-util/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ impl RuntimeInfo {
}
}

/// Retrieve the current session index.
pub async fn get_session_index<Sender>(
/// Returns the session index expected at any child of the `parent` block.
/// This does not return the session index for the `parent` block.
pub async fn get_session_index_for_child<Sender>(
&mut self,
sender: &mut Sender,
parent: Hash,
Expand All @@ -141,14 +142,14 @@ impl RuntimeInfo {
pub async fn get_session_info<'a, Sender>(
&'a mut self,
sender: &mut Sender,
parent: Hash,
relay_parent: Hash,
) -> Result<&'a ExtendedSessionInfo>
where
Sender: SubsystemSender,
{
let session_index = self.get_session_index(sender, parent).await?;
let session_index = self.get_session_index_for_child(sender, relay_parent).await?;

self.get_session_info_by_index(sender, parent, session_index).await
self.get_session_info_by_index(sender, relay_parent, session_index).await
}

/// Get `ExtendedSessionInfo` by session index.
Expand Down Expand Up @@ -185,7 +186,7 @@ impl RuntimeInfo {
pub async fn check_signature<Sender, Payload, RealPayload>(
&mut self,
sender: &mut Sender,
parent: Hash,
relay_parent: Hash,
signed: UncheckedSigned<Payload, RealPayload>,
) -> Result<
std::result::Result<Signed<Payload, RealPayload>, UncheckedSigned<Payload, RealPayload>>,
Expand All @@ -195,9 +196,9 @@ impl RuntimeInfo {
Payload: EncodeAs<RealPayload> + Clone,
RealPayload: Encode + Clone,
{
let session_index = self.get_session_index(sender, parent).await?;
let info = self.get_session_info_by_index(sender, parent, session_index).await?;
Ok(check_signature(session_index, &info.session_info, parent, signed))
let session_index = self.get_session_index_for_child(sender, relay_parent).await?;
let info = self.get_session_info_by_index(sender, relay_parent, session_index).await?;
Ok(check_signature(session_index, &info.session_info, relay_parent, signed))
}

/// Build `ValidatorInfo` for the current session.
Expand Down