Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
make aleph node compatible with state pruning
  • Loading branch information
maciejnems committed Jan 27, 2023
commit e1ee175353ede02f279f68bf3dc49aac3c25d69d
14 changes: 9 additions & 5 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sp_blockchain::Backend as _;
use sp_consensus_aura::{sr25519::AuthorityPair as AuraPair, Slot};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT, Zero},
traits::{Block as BlockT, Header as HeaderT},
};

use crate::{aleph_cli::AlephCli, chain_spec::DEFAULT_BACKUP_FOLDER, executor::AlephExecutor};
Expand Down Expand Up @@ -311,17 +311,19 @@ pub fn new_authority(
.path(),
);

let finalized = client.info().finalized_number;

let session_period = SessionPeriod(
client
.runtime_api()
.session_period(&BlockId::Number(Zero::zero()))
.session_period(&BlockId::Number(finalized))
.unwrap(),
);

let millisecs_per_block = MillisecsPerBlock(
client
.runtime_api()
.millisecs_per_block(&BlockId::Number(Zero::zero()))
.millisecs_per_block(&BlockId::Number(finalized))
.unwrap(),
);

Expand Down Expand Up @@ -452,17 +454,19 @@ pub fn new_full(
justification_tx,
)?;

let finalized = client.info().finalized_number;

let session_period = SessionPeriod(
client
.runtime_api()
.session_period(&BlockId::Number(Zero::zero()))
.session_period(&BlockId::Number(finalized))
.unwrap(),
);

let millisecs_per_block = MillisecsPerBlock(
client
.runtime_api()
.millisecs_per_block(&BlockId::Number(Zero::zero()))
.millisecs_per_block(&BlockId::Number(finalized))
.unwrap(),
);

Expand Down
3 changes: 2 additions & 1 deletion finality-aleph/src/nodes/nonvalidator_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ where
let map_updater = SessionMapUpdater::<_, _, B>::new(
AuthorityProviderImpl::new(client.clone()),
FinalityNotificatorImpl::new(client.clone()),
session_period,
);
let session_authorities = map_updater.readonly_session_map();
spawn_handle.spawn("aleph/updater", None, async move {
debug!(target: "aleph-party", "SessionMapUpdater has started.");
map_updater.run(session_period).await
map_updater.run().await
});
let (_, handler_task) = setup_justification_handler(JustificationParams {
justification_rx,
Expand Down
3 changes: 2 additions & 1 deletion finality-aleph/src/nodes/validator_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ where
let map_updater = SessionMapUpdater::<_, _, B>::new(
AuthorityProviderImpl::new(client.clone()),
FinalityNotificatorImpl::new(client.clone()),
session_period,
);
let session_authorities = map_updater.readonly_session_map();
spawn_handle.spawn("aleph/updater", None, async move {
debug!(target: "aleph-party", "SessionMapUpdater has started.");
map_updater.run(session_period).await
map_updater.run().await
});

let (authority_justification_tx, handler_task) =
Expand Down
126 changes: 64 additions & 62 deletions finality-aleph/src/session_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_runtime::{
generic::BlockId,
traits::{Block, Header, NumberFor},
SaturatedConversion,
};
use tokio::sync::{
oneshot::{Receiver as OneShotReceiver, Sender as OneShotSender},
Expand All @@ -31,6 +30,8 @@ pub trait AuthorityProvider<B> {
}

/// Default implementation of authority provider trait.
/// If state pruning is on and set to `n`, will no longer be able to
/// answer for `num < finalized_number - n`.
pub struct AuthorityProviderImpl<C, B, BE>
where
C: ClientForAleph<B, BE> + Send + Sync + 'static,
Expand Down Expand Up @@ -228,26 +229,6 @@ impl ReadOnlySessionMap {
}
}

fn get_authority_data_for_session<AP, B>(
authority_provider: &AP,
session_id: SessionId,
first_block: NumberFor<B>,
) -> SessionAuthorityData
where
B: Block,
AP: AuthorityProvider<NumberFor<B>>,
{
if session_id == SessionId(0) {
authority_provider
.authority_data(<NumberFor<B>>::saturated_from(0u32))
.expect("Authorities for the session 0 must be available from the beginning")
} else {
authority_provider.next_authority_data(first_block).unwrap_or_else(||
panic!("Authorities for next session {:?} must be available at first block #{:?} of current session", session_id.0, first_block)
)
}
}

/// Struct responsible for updating session map
pub struct SessionMapUpdater<AP, FN, B>
where
Expand All @@ -258,6 +239,7 @@ where
session_map: SharedSessionMap,
authority_provider: AP,
finality_notificator: FN,
period: SessionPeriod,
_phantom: PhantomData<B>,
}

Expand All @@ -267,11 +249,12 @@ where
FN: FinalityNotificator<FinalityNotification<B>, NumberFor<B>>,
B: Block,
{
pub fn new(authority_provider: AP, finality_notificator: FN) -> Self {
pub fn new(authority_provider: AP, finality_notificator: FN, period: SessionPeriod) -> Self {
Self {
session_map: SharedSessionMap::new(),
authority_provider,
finality_notificator,
period,
_phantom: PhantomData,
}
}
Expand All @@ -281,76 +264,95 @@ where
self.session_map.read_only()
}

/// puts authority data for the next session into the session map
async fn handle_first_block_of_session(&mut self, num: NumberFor<B>, session_id: SessionId) {
debug!(target: "aleph-session-updater", "Handling first block #{:?} of session {:?}", num, session_id.0);
let next_session = SessionId(session_id.0 + 1);
let authority_provider = &self.authority_provider;
self.session_map
.update(
next_session,
get_authority_data_for_session::<_, B>(authority_provider, next_session, num),
)
.await;

// if this is the first session we also need to include starting authority data into the map
if session_id.0 == 0 {
let authority_provider = &self.authority_provider;
/// Puts authority data for the next session into the session map
async fn handle_first_block_of_session(&mut self, session_id: SessionId) {
let first_block = first_block_of_session(session_id, self.period);
debug!(target: "aleph-session-updater",
"Handling first block #{:?} of session {:?}",
first_block, session_id.0
);

if let Some(authority_data) = self.authority_provider.next_authority_data(first_block) {
self.session_map
.update(
session_id,
get_authority_data_for_session::<_, B>(authority_provider, session_id, num),
)
.update(SessionId(session_id.0 + 1), authority_data)
.await;
} else {
panic!("Authorities for next session {:?} must be available at first block #{:?} of current session", session_id.0, first_block);
}

if session_id.0 >= PRUNING_THRESHOLD && session_id.0 % PRUNING_THRESHOLD == 0 {
debug!(target: "aleph-session-updater", "Pruning session map below session #{:?}", session_id.0 - PRUNING_THRESHOLD);
if session_id.0 > PRUNING_THRESHOLD && session_id.0 % PRUNING_THRESHOLD == 0 {
debug!(target: "aleph-session-updater",
"Pruning session map below session #{:?}",
session_id.0 - PRUNING_THRESHOLD
);
self.session_map
.prune_below(SessionId(session_id.0 - PRUNING_THRESHOLD))
.await;
}
}

async fn update_session(&mut self, session_id: SessionId, period: SessionPeriod) {
let first_block = first_block_of_session(session_id, period);
self.handle_first_block_of_session(first_block, session_id)
.await;
fn authorities_for_session(&mut self, session_id: SessionId) -> Option<SessionAuthorityData> {
let first_block = first_block_of_session(session_id, self.period);
self.authority_provider.authority_data(first_block)
}

fn catch_up_boundaries(&self, period: SessionPeriod) -> (SessionId, SessionId) {
/// Puts current and next session authorities in the session map.
/// If previous authorities are still available in `AuthorityProvider`, also puts them in the session map.
async fn catch_up(&mut self) -> SessionId {
let last_finalized = self.finality_notificator.last_finalized();

let current_session = session_id_from_block_num(last_finalized, period);
let starting_session = SessionId(current_session.0.saturating_sub(PRUNING_THRESHOLD));
let current_session = session_id_from_block_num(last_finalized, self.period);
let starting_session = SessionId(current_session.0.saturating_sub(PRUNING_THRESHOLD - 1));

(starting_session, current_session)
}
debug!(target: "aleph-session-updater",
"Last finalized is {:?}; Catching up with authorities starting from session {:?} up to next session {:?}",
last_finalized, starting_session.0, current_session.0 + 1
);

pub async fn run(mut self, period: SessionPeriod) {
let mut notifications = self.finality_notificator.notification_stream();
// lets catch up with previous sessions
for session in starting_session.0..current_session.0 {
let id = SessionId(session);
if let Some(authority_data) = self.authorities_for_session(id) {
self.session_map.update(id, authority_data).await;
} else {
debug!(target: "aleph-session-updater", "No authorities for session {:?} during catch-up. Most likely already pruned.", id.0)
}
}

let (starting_session, current_session) = self.catch_up_boundaries(period);
// lets catch up with previous session
match self.authorities_for_session(current_session) {
Some(current_authority_data) => {
self.session_map
.update(current_session, current_authority_data)
.await
}
None => panic!(
"Authorities for current session {:?} must be available from the beginning",
current_session.0
),
};

// lets catch up
for session in starting_session.0..=current_session.0 {
self.update_session(SessionId(session), period).await;
}
self.handle_first_block_of_session(current_session).await;

let mut last_updated = current_session;
current_session
}

pub async fn run(mut self) {
let mut notifications = self.finality_notificator.notification_stream();
let mut last_updated = self.catch_up().await;

while let Some(FinalityNotification { header, .. }) = notifications.next().await {
let last_finalized = header.number();
trace!(target: "aleph-session-updater", "got FinalityNotification about #{:?}", last_finalized);

let session_id = session_id_from_block_num(*last_finalized, period);
let session_id = session_id_from_block_num(*last_finalized, self.period);

if last_updated >= session_id {
continue;
}

for session in (last_updated.0 + 1)..=session_id.0 {
self.update_session(SessionId(session), period).await;
self.handle_first_block_of_session(SessionId(session)).await;
}

last_updated = session_id;
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/sync/substrate/verification/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Display for CacheError {
UnknownAuthorities(session) => {
write!(
f,
"authorities for session {:?} not present on chain even though they should be",
"authorities for session {:?} not present on chain. Most likely state is already pruned",
session
)
}
Expand Down