Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions finality-aleph/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::{
mod substrate;
mod ticker;

const LOG_TARGET: &str = "aleph-block-sync";

/// The identifier of a block, the least amount of knowledge we can have about a block.
pub trait BlockIdentifier: Clone + Hash + Debug + Eq {
/// The block number, useful when reasoning about hopeless forks.
Expand Down Expand Up @@ -59,23 +61,25 @@ pub trait Finalizer<J: Justification> {
fn finalize(&self, justification: J) -> Result<(), Self::Error>;
}

/// A notification about the chain state changing.
pub enum ChainStateNotification<BI: BlockIdentifier> {
/// A notification about the chain status changing.
pub enum ChainStatusNotification<BI: BlockIdentifier> {
/// A block has been imported.
BlockImported(BI),
/// A block has been finalized.
BlockFinalized(BI),
}

/// A stream of notifications about the chain state in the database changing.
/// A stream of notifications about the chain status in the database changing.
#[async_trait::async_trait]
pub trait ChainStateNotifier<BI: BlockIdentifier> {
/// Returns a chain state notification when it is available.
async fn next(&self) -> ChainStateNotification<BI>;
pub trait ChainStatusNotifier<BI: BlockIdentifier> {
type Error: Display;

/// Returns a chain status notification when it is available.
async fn next(&mut self) -> Result<ChainStatusNotification<BI>, Self::Error>;
}

/// The state of a block in the database.
pub enum BlockState<J: Justification> {
/// The status of a block in the database.
pub enum BlockStatus<J: Justification> {
/// The block is justified and thus finalized.
Justified(J),
/// The block is present, might be finalized if a descendant is justified.
Expand All @@ -84,10 +88,10 @@ pub enum BlockState<J: Justification> {
Unknown,
}

/// The knowledge about the chain state.
pub trait ChainState<J: Justification> {
/// The state of the block.
fn state_of(&self, id: <J::Header as Header>::Identifier) -> BlockState<J>;
/// The knowledge about the chain status.
pub trait ChainStatus<J: Justification> {
/// The status of the block.
fn status_of(&self, id: <J::Header as Header>::Identifier) -> BlockStatus<J>;

/// The header of the best block.
fn best_block(&self) -> J::Header;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use sp_runtime::traits::{CheckedSub, Header as SubstrateHeader, One};

use crate::sync::{BlockIdentifier, Header};

mod status_notifier;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BlockId<H: SubstrateHeader<Number = BlockNumber>> {
hash: H::Hash,
Expand Down
81 changes: 81 additions & 0 deletions finality-aleph/src/sync/substrate/status_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::fmt::{Display, Error as FmtError, Formatter};

use aleph_primitives::BlockNumber;
use futures::StreamExt;
use sc_client_api::client::{FinalityNotifications, ImportNotifications};
use sp_runtime::traits::{Block as BlockT, Header as SubstrateHeader};
use tokio::select;

use crate::sync::{substrate::BlockId, ChainStatusNotification, ChainStatusNotifier, Header};

/// What can go wrong when waiting for next chain status notification.
#[derive(Debug)]
pub enum Error {
JustificationStream,
ImportStream,
}

impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use Error::*;
match self {
JustificationStream => {
write!(f, "finalization notification stream has ended")
}
ImportStream => {
write!(f, "import notification stream has ended")
}
}
}
}

/// Substrate specific implementation of `ChainStatusNotifier`.
pub struct SubstrateChainStatusNotifier<H, B>
where
H: SubstrateHeader<Number = BlockNumber>,
B: BlockT<Header = H>,
{
finality_notifications: FinalityNotifications<B>,
import_notifications: ImportNotifications<B>,
}

impl<H, B> SubstrateChainStatusNotifier<H, B>
where
H: SubstrateHeader<Number = BlockNumber>,
B: BlockT<Header = H>,
{
fn new(
finality_notifications: FinalityNotifications<B>,
import_notifications: ImportNotifications<B>,
) -> Self {
Self {
finality_notifications,
import_notifications,
}
}
}

#[async_trait::async_trait]
impl<H, B> ChainStatusNotifier<BlockId<H>> for SubstrateChainStatusNotifier<H, B>
where
H: SubstrateHeader<Number = BlockNumber>,
B: BlockT<Header = H>,
{
type Error = Error;

/// Returns next chain status notification.
async fn next(&mut self) -> Result<ChainStatusNotification<BlockId<H>>, Self::Error> {
select! {
maybe_block = self.finality_notifications.next() => {
maybe_block
.map(|block| ChainStatusNotification::BlockFinalized(block.header.id()))
.ok_or(Error::JustificationStream)
},
maybe_block = self.import_notifications.next() => {
maybe_block
.map(|block| ChainStatusNotification::BlockImported(block.header.id()))
.ok_or(Error::ImportStream)
}
}
}
}