Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion finality-aleph/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::{
mod substrate;
mod ticker;

const LOG_TARGET: &str = "aleph-block-sync";

/// The identifier of a block, the least amount of knowledge we can have about a block.
pub trait BlockIdentifier: Clone + Hash + Debug + Eq {
/// The block number, useful when reasoning about hopeless forks.
Expand Down Expand Up @@ -70,8 +72,10 @@ pub enum ChainStateNotification<BI: BlockIdentifier> {
/// A stream of notifications about the chain state in the database changing.
#[async_trait::async_trait]
pub trait ChainStateNotifier<BI: BlockIdentifier> {
type Error: Display;

/// Returns a chain state notification when it is available.
async fn next(&self) -> ChainStateNotification<BI>;
async fn next(&mut self) -> Result<ChainStateNotification<BI>, Self::Error>;
}

/// The state of a block in the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use sp_runtime::traits::{CheckedSub, Header as SubstrateHeader, One};

use crate::sync::{BlockIdentifier, Header};

mod state_notifier;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BlockId<H: SubstrateHeader<Number = BlockNumber>> {
hash: H::Hash,
Expand Down
81 changes: 81 additions & 0 deletions finality-aleph/src/sync/substrate/state_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::fmt::{Display, Error as FmtError, Formatter};

use aleph_primitives::BlockNumber;
use futures::StreamExt;
use sc_client_api::client::{FinalityNotifications, ImportNotifications};
use sp_runtime::traits::{Block as BlockT, Header as SubstrateHeader};
use tokio::select;

use crate::sync::{substrate::BlockId, ChainStateNotification, ChainStateNotifier, Header};

/// What can go wrong when waiting for next chain state notification.
#[derive(Debug)]
pub enum Error {
JustificationStream,
ImportStream,
}

impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use Error::*;
match self {
JustificationStream => {
write!(f, "finalization notification stream has ended")
}
ImportStream => {
write!(f, "import notification stream has ended")
}
}
}
}

/// Substrate specific implementation of `ChainStateNotifier`.
pub struct SubstrateChainStateNotifier<H, B>
where
H: SubstrateHeader<Number = BlockNumber>,
B: BlockT<Header = H>,
{
finality_notifications: FinalityNotifications<B>,
import_notifications: ImportNotifications<B>,
}

impl<H, B> SubstrateChainStateNotifier<H, B>
where
H: SubstrateHeader<Number = BlockNumber>,
B: BlockT<Header = H>,
{
fn new(
finality_notifications: FinalityNotifications<B>,
import_notifications: ImportNotifications<B>,
) -> Self {
Self {
finality_notifications,
import_notifications,
}
}
}

#[async_trait::async_trait]
impl<H, B> ChainStateNotifier<BlockId<H>> for SubstrateChainStateNotifier<H, B>
where
H: SubstrateHeader<Number = BlockNumber>,
B: BlockT<Header = H>,
{
type Error = Error;

/// Returns next chain state notification.
async fn next(&mut self) -> Result<ChainStateNotification<BlockId<H>>, Self::Error> {
select! {
maybe_block = self.finality_notifications.next() => {
maybe_block
.map(|block| ChainStateNotification::BlockFinalized(block.header.id()))
.ok_or(Error::JustificationStream)
},
maybe_block = self.import_notifications.next() => {
maybe_block
.map(|block| ChainStateNotification::BlockImported(block.header.id()))
.ok_or(Error::ImportStream)
}
}
}
}