Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions finality-aleph/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,25 @@ pub trait Finalizer<J: Justification> {
fn finalize(&self, justification: J) -> Result<(), Self::Error>;
}

/// A notification about the chain state changing.
pub enum ChainStateNotification<BI: BlockIdentifier> {
/// A notification about the chain status changing.
pub enum ChainStatusNotification<BI: BlockIdentifier> {
/// A block has been imported.
BlockImported(BI),
/// A block has been finalized.
BlockFinalized(BI),
}

/// A stream of notifications about the chain state in the database changing.
/// A stream of notifications about the chain status in the database changing.
#[async_trait::async_trait]
pub trait ChainStateNotifier<BI: BlockIdentifier> {
/// Returns a chain state notification when it is available.
async fn next(&self) -> ChainStateNotification<BI>;
pub trait ChainStatusNotifier<BI: BlockIdentifier> {
type Error: Display;

/// Returns a chain status notification when it is available.
async fn next(&mut self) -> Result<ChainStatusNotification<BI>, Self::Error>;
}

/// The state of a block in the database.
pub enum BlockState<J: Justification> {
/// The status of a block in the database.
pub enum BlockStatus<J: Justification> {
/// The block is justified and thus finalized.
Justified(J),
/// The block is present, might be finalized if a descendant is justified.
Expand All @@ -84,10 +86,10 @@ pub enum BlockState<J: Justification> {
Unknown,
}

/// The knowledge about the chain state.
pub trait ChainState<J: Justification> {
/// The state of the block.
fn state_of(&self, id: <J::Header as Header>::Identifier) -> BlockState<J>;
/// The knowledge about the chain status.
pub trait ChainStatus<J: Justification> {
/// The status of the block.
fn status_of(&self, id: <J::Header as Header>::Identifier) -> BlockStatus<J>;

/// The header of the best block.
fn best_block(&self) -> J::Header;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use sp_runtime::traits::{CheckedSub, Header as SubstrateHeader, One};

use crate::sync::{BlockIdentifier, Header};

mod status_notifier;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BlockId<H: SubstrateHeader<Number = BlockNumber>> {
hash: H::Hash,
Expand Down
78 changes: 78 additions & 0 deletions finality-aleph/src/sync/substrate/status_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::fmt::{Display, Error as FmtError, Formatter};

use aleph_primitives::BlockNumber;
use futures::StreamExt;
use sc_client_api::client::{FinalityNotifications, ImportNotifications};
use sp_runtime::traits::{Block as BlockT, Header as SubstrateHeader};
use tokio::select;

use crate::sync::{substrate::BlockId, ChainStatusNotification, ChainStatusNotifier, Header};

/// What can go wrong when waiting for next chain status notification.
#[derive(Debug)]
pub enum Error {
JustificationStreamClosed,
ImportStreamClosed,
}

impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use Error::*;
match self {
JustificationStreamClosed => {
write!(f, "finalization notification stream has ended")
}
ImportStreamClosed => {
write!(f, "import notification stream has ended")
}
}
}
}

/// Substrate specific implementation of `ChainStatusNotifier`.
pub struct SubstrateChainStatusNotifier<B>
where
B: BlockT,
{
finality_notifications: FinalityNotifications<B>,
import_notifications: ImportNotifications<B>,
}

impl<B> SubstrateChainStatusNotifier<B>
where
B: BlockT,
{
fn new(
finality_notifications: FinalityNotifications<B>,
import_notifications: ImportNotifications<B>,
) -> Self {
Self {
finality_notifications,
import_notifications,
}
}
}

#[async_trait::async_trait]
impl<B> ChainStatusNotifier<BlockId<B::Header>> for SubstrateChainStatusNotifier<B>
where
B: BlockT,
B::Header: SubstrateHeader<Number = BlockNumber>,
{
type Error = Error;

async fn next(&mut self) -> Result<ChainStatusNotification<BlockId<B::Header>>, Self::Error> {
select! {
maybe_block = self.finality_notifications.next() => {
maybe_block
.map(|block| ChainStatusNotification::BlockFinalized(block.header.id()))
.ok_or(Error::JustificationStreamClosed)
},
maybe_block = self.import_notifications.next() => {
maybe_block
.map(|block| ChainStatusNotification::BlockImported(block.header.id()))
.ok_or(Error::ImportStreamClosed)
}
}
}
}