Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
client/beefy: decouple voter init from aux db state load
  • Loading branch information
acatangiu committed Nov 21, 2022
commit 1b0e93e1d5ed8b3b132a34909f4eebb014e73ebf
216 changes: 14 additions & 202 deletions client/beefy/src/aux_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,23 @@

//! Schema for BEEFY state persisted in the aux-db.

use crate::{round::Rounds, worker::PersistedState};
use beefy_primitives::{
crypto::AuthorityId, BeefyApi, ValidatorSet, BEEFY_ENGINE_ID, GENESIS_AUTHORITY_SET_ID,
};
use crate::worker::PersistedState;
use codec::{Decode, Encode};
use futures::{stream::Fuse, StreamExt};
use log::{debug, error, info, trace};
use sc_client_api::{backend::AuxStore, Backend, FinalityNotifications};
use sc_network_gossip::GossipEngine;
use sp_api::{BlockId, HeaderT, ProvideRuntimeApi};
use sp_blockchain::{
Backend as BlockBackend, Error as ClientError, HeaderBackend, Result as ClientResult,
};
use sp_runtime::traits::{Block as BlockT, One, Zero};
use std::collections::VecDeque;
use log::{info, trace};
use sc_client_api::{backend::AuxStore, Backend};
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_runtime::traits::Block as BlockT;

const VERSION_KEY: &[u8] = b"beefy_auxschema_version";
const WORKER_STATE: &[u8] = b"beefy_voter_state";

const CURRENT_VERSION: u32 = 1;

pub(crate) fn write_current_version<B: AuxStore>(backend: &B) -> ClientResult<()> {
info!(target: "beefy", "🥩 write aux schema version {:?}", CURRENT_VERSION);
AuxStore::insert_aux(backend, &[(VERSION_KEY, CURRENT_VERSION.encode().as_slice())], &[])
}

/// Write voter state.
pub(crate) fn write_voter_state<Block: BlockT, B: AuxStore>(
backend: &B,
Expand All @@ -58,206 +54,22 @@ fn load_decode<B: AuxStore, T: Decode>(backend: &B, key: &[u8]) -> ClientResult<
}

/// Load or initialize persistent data from backend.
pub(crate) async fn load_persistent<B, BE, R>(
backend: &BE,
runtime: &R,
gossip_engine: &mut GossipEngine<B>,
finality: &mut Fuse<FinalityNotifications<B>>,
min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
pub(crate) fn load_persistent<B, BE>(backend: &BE) -> ClientResult<Option<PersistedState<B>>>
where
B: BlockT,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
let version: Option<u32> = load_decode(backend, VERSION_KEY)?;

match version {
None => (),
Some(1) =>
if let Some(mut state) = load_decode::<_, PersistedState<B>>(backend, WORKER_STATE)? {
// Overwrite persisted data with newly provided `min_block_delta`.
state.set_min_block_delta(min_block_delta);
info!(target: "beefy", "🥩 Loading BEEFY voter state from db: {:?}.", state);
return Ok(state)
},
Some(1) => return load_decode::<_, PersistedState<B>>(backend, WORKER_STATE),
other =>
return Err(ClientError::Backend(format!("Unsupported BEEFY DB version: {:?}", other))),
}

migrate_from_version0(backend, runtime, gossip_engine, finality, min_block_delta).await
}

// If no persisted state present, walk back the chain from first GRANDPA notification to either:
// - latest BEEFY finalized block, or if none found on the way,
// - BEEFY pallet genesis;
// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to finalize.
async fn migrate_from_version0<B, BE, R>(
backend: &BE,
runtime: &R,
gossip_engine: &mut GossipEngine<B>,
finality: &mut Fuse<FinalityNotifications<B>>,
min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
where
B: BlockT,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
AuxStore::insert_aux(backend, &[(VERSION_KEY, CURRENT_VERSION.encode().as_slice())], &[])?;

let (active, best_grandpa) = wait_for_runtime_pallet(runtime.clone(), gossip_engine, finality)
.await
.ok_or_else(|| ClientError::Backend("Gossip engine has terminated.".into()))?;

debug!(target: "beefy", "🥩 pallet available: header {:?} validator set {:?}", best_grandpa, active);
println!("🥩 pallet available: header {:?} validator set {:?}", best_grandpa, active);

// Walk back the imported blocks and initialize voter either, at the last block with
// a BEEFY justification, or at pallet genesis block; voter will resume from there.
let blockchain = backend.blockchain();
let mut sessions = VecDeque::new();
let mut header = best_grandpa.clone();
let state = loop {
println!("🥩 currently looping/looking at {:?}", *header.number());

if let Some(true) = blockchain
.justifications(header.hash())
.ok()
.flatten()
.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
{
info!(
target: "beefy",
"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
*header.number()
);
println!(
"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
*header.number()
);
let best_beefy = *header.number();
// If no session boundaries detected so far, just initialize new rounds here.
if sessions.is_empty() {
let active_set = expect_validator_set(runtime, BlockId::hash(header.hash()))?;
let mut rounds = Rounds::new(best_beefy, active_set);
// Mark the round as already finalized.
rounds.conclude(best_beefy);
sessions.push_front(rounds);
}
let state =
PersistedState::checked_new(best_grandpa, best_beefy, sessions, min_block_delta)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?;
break state
}

// Check if we should move up the chain.
let parent_hash = *header.parent_hash();
if *header.number() == One::one() ||
runtime
.runtime_api()
.validator_set(&BlockId::hash(parent_hash))
.ok()
.flatten()
.is_none()
{
// We've reached pallet genesis, initialize voter here.
let genesis_num = *header.number();
let genesis_set = expect_validator_set(runtime, BlockId::hash(header.hash()))?;
info!(
target: "beefy",
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
Starting voting rounds at block {:?}, genesis validator set {:?}.",
genesis_num, genesis_set,
);
println!(
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
Starting voting rounds at block {:?}, genesis validator set {:?}.",
genesis_num, genesis_set,
);

sessions.push_front(Rounds::new(genesis_num, genesis_set));
break PersistedState::checked_new(best_grandpa, Zero::zero(), sessions, min_block_delta)
.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?
}

if let Some(active) = crate::worker::find_authorities_change::<B>(&header) {
info!(target: "beefy", "🥩 Marking block {:?} as BEEFY Mandatory.", *header.number());
println!("🥩 Marking block {:?} as BEEFY Mandatory.", *header.number());
sessions.push_front(Rounds::new(*header.number(), active));
}

// Move up the chain.
header = blockchain.expect_header(BlockId::Hash(parent_hash))?;
};

write_voter_state(backend, &state)?;
Ok(state)
}

/// Wait for BEEFY runtime pallet to be available, return active validator set.
/// Should be called only once during worker initialization.
async fn wait_for_runtime_pallet<B, R>(
runtime: &R,
mut gossip_engine: &mut GossipEngine<B>,
finality: &mut Fuse<FinalityNotifications<B>>,
) -> Option<(ValidatorSet<AuthorityId>, <B as BlockT>::Header)>
where
B: BlockT,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
loop {
futures::select! {
notif = finality.next() => {
let notif = match notif {
Some(notif) => notif,
None => break
};
let at = BlockId::hash(notif.header.hash());
if let Some(active) = runtime.runtime_api().validator_set(&at).ok().flatten() {
// Beefy pallet available, return active validator set.
return Some((active, notif.header))
} else {
trace!(target: "beefy", "🥩 Finality notification: {:?}", notif);
debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available...");
}
},
_ = gossip_engine => {
break
}
}
}
None
}

fn genesis_sanity_check(active: ValidatorSet<AuthorityId>) -> Option<ValidatorSet<AuthorityId>> {
if active.id() == GENESIS_AUTHORITY_SET_ID {
Some(active)
} else {
error!(target: "beefy", "🥩 Unexpected ID for genesis validator set {:?}.", active);
None
}
}

fn expect_validator_set<B, R>(
runtime: &R,
at: BlockId<B>,
) -> ClientResult<ValidatorSet<AuthorityId>>
where
B: BlockT,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B>,
{
runtime
.runtime_api()
.validator_set(&at)
.ok()
.flatten()
.and_then(genesis_sanity_check)
.ok_or_else(|| ClientError::Backend("BEEFY Genesis sanity check failed.".into()))
// No persistent state found in DB.
Ok(None)
}

#[cfg(test)]
Expand Down
Loading