diff --git a/Cargo.lock b/Cargo.lock index a207eb8ad9577..5e12dcd427b15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4544,6 +4544,7 @@ dependencies = [ "substrate-consensus-common 2.0.0", "substrate-inherents 2.0.0", "substrate-primitives 2.0.0", + "substrate-telemetry 2.0.0", "substrate-test-runtime-client 2.0.0", ] diff --git a/core/consensus/aura/src/lib.rs b/core/consensus/aura/src/lib.rs index fa5b0533b61e2..8f52120ad52df 100644 --- a/core/consensus/aura/src/lib.rs +++ b/core/consensus/aura/src/lib.rs @@ -145,13 +145,13 @@ pub fn start_aura( C: ProvideRuntimeApi + BlockOf + ProvideCache + AuxStore + Send + Sync, C::Api: AuraApi>, SC: SelectChain, + E: Environment + Send + Sync + 'static, E::Proposer: Proposer, - >::Create: Unpin + Send + 'static, - P: Pair + Send + Sync + 'static, + >::Create: Unpin + Send, + P: Pair + Send + Sync, P::Public: Hash + Member + Encode + Decode, P::Signature: Hash + Member + Encode + Decode, H: Header, - E: Environment, I: BlockImport + Send + Sync + 'static, Error: ::std::error::Error + Send + From<::consensus_common::Error> + From + 'static, SO: SyncOracle + Send + Sync + Clone, @@ -189,143 +189,72 @@ struct AuraWorker { _key_type: PhantomData

, } -impl SlotWorker for AuraWorker where +impl slots::SimpleSlotWorker for AuraWorker where B: BlockT, C: ProvideRuntimeApi + BlockOf + ProvideCache + Sync, C::Api: AuraApi>, E: Environment, E::Proposer: Proposer, - >::Create: Unpin + Send + 'static, + >::Create: Unpin + Send, H: Header, I: BlockImport + Send + Sync + 'static, - P: Pair + Send + Sync + 'static, + P: Pair + Send + Sync, P::Public: Member + Encode + Decode + Hash, P::Signature: Member + Encode + Decode + Hash + Debug, SO: SyncOracle + Send + Clone, Error: ::std::error::Error + Send + From<::consensus_common::Error> + From + 'static, { - type OnSlot = Pin> + Send>>; - - fn on_slot( - &mut self, - chain_head: B::Header, - slot_info: SlotInfo, - ) -> Self::OnSlot { - let client = self.client.clone(); - let block_import = self.block_import.clone(); - - let (timestamp, slot_num, slot_duration) = - (slot_info.timestamp, slot_info.number, slot_info.duration); - - let authorities = match authorities(client.as_ref(), &BlockId::Hash(chain_head.hash())) { - Ok(authorities) => authorities, - Err(e) => { - warn!("Unable to fetch authorities at block {:?}: {:?}", chain_head.hash(), e); - - telemetry!( - CONSENSUS_WARN; "aura.unable_fetching_authorities"; - "slot" => ?chain_head.hash(), - "err" => ?e, - ); - return Box::pin(future::ready(Ok(()))); - } - }; + type EpochData = Vec>; + type Claim = P; + type SyncOracle = SO; + type Proposer = E::Proposer; + type BlockImport = I; + + fn logging_target(&self) -> &'static str { + "aura" + } - if !self.force_authoring && self.sync_oracle.is_offline() && authorities.len() > 1 { - debug!(target: "aura", "Skipping proposal slot. Waiting for the network."); - telemetry!( - CONSENSUS_DEBUG; - "aura.skipping_proposal_slot"; - "authorities_len" => authorities.len(), - ); - return Box::pin(future::ready(Ok(()))); - } - let maybe_author = slot_author::

(slot_num, &authorities); - let maybe_pair = maybe_author.and_then(|p| - self.keystore.as_ref().and_then(|k| - k.read().key_pair_by_type::

(&p, app_crypto::key_types::AURA).ok() - ) - ); - let proposal_work = match maybe_pair { - None => return Box::pin(future::ready(Ok(()))), - Some(pair) => { - debug!( - target: "aura", "Starting authorship at slot {}; timestamp = {}", - slot_num, - timestamp, - ); - telemetry!(CONSENSUS_DEBUG; "aura.starting_authorship"; - "slot_num" => slot_num, - "timestamp" => timestamp, - ); + fn block_import(&self) -> Arc> { + self.block_import.clone() + } - // we are the slot author. make a block and sign it. - let mut proposer = match self.env.init(&chain_head) { - Ok(p) => p, - Err(e) => { - warn!("Unable to author block in slot {:?}: {:?}", slot_num, e); - telemetry!(CONSENSUS_WARN; "aura.unable_authoring_block"; - "slot" => slot_num, "err" => ?e - ); - return Box::pin(future::ready(Ok(()))) - } - }; + fn epoch_data(&self, block: &B::Hash) -> Result { + authorities(self.client.as_ref(), &BlockId::Hash(*block)) + } - let remaining_duration = slot_info.remaining_duration(); - // deadline our production to approx. the end of the - // slot - futures::future::select( - proposer.propose( - slot_info.inherent_data, - generic::Digest { - logs: vec![ - as CompatibleDigestItem

>::aura_pre_digest(slot_num), - ], - }, - remaining_duration, - ).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into()), - Delay::new(remaining_duration) - .map_err(|err| consensus_common::Error::FaultyTimer(err).into()) - ).map(|v| match v { - futures::future::Either::Left((v, _)) => v.map(|v| (v, pair)), - futures::future::Either::Right((Ok(_), _)) => - Err(consensus_common::Error::ClientImport("Timeout in the AuRa proposer".into())), - futures::future::Either::Right((Err(err), _)) => Err(err), - }) - } - }; + fn authorities_len(&self, epoch_data: &Self::EpochData) -> usize { + epoch_data.len() + } - Box::pin(proposal_work.map_ok(move |(b, pair)| { - // minor hack since we don't have access to the timestamp - // that is actually set by the proposer. - let slot_after_building = SignedDuration::default().slot_now(slot_duration); - if slot_after_building != slot_num { - info!("Discarding proposal for slot {}; block production took too long", slot_num); - telemetry!(CONSENSUS_INFO; "aura.discarding_proposal_took_too_long"; - "slot" => slot_num, - ); - return - } + fn claim_slot(&self, slot_number: u64, epoch_data: &Self::EpochData) -> Option { + let expected_author = slot_author::

(slot_number, epoch_data); - let (header, body) = b.deconstruct(); - let pre_digest: Result = find_pre_digest::(&header); - if let Err(e) = pre_digest { - error!(target: "aura", "FATAL ERROR: Invalid pre-digest: {}!", e); - return - } else { - trace!(target: "aura", "Got correct number of seals. Good!") - }; + expected_author.and_then(|p| { + self.keystore.as_ref().and_then(|k| { + k.read().key_pair_by_type::

(&p, app_crypto::key_types::AURA).ok() + }) + }) + } - let header_num = header.number().clone(); - let parent_hash = header.parent_hash().clone(); + fn pre_digest_data(&self, slot_number: u64, _claim: &Self::Claim) -> Vec> { + vec![ + as CompatibleDigestItem

>::aura_pre_digest(slot_number), + ] + } + fn import_block(&self) -> Box, + Self::Claim, + ) -> consensus_common::BlockImportParams + Send> { + Box::new(|header, header_hash, body, pair| { // sign the pre-sealed hash of the block and then // add it to a digest item. - let header_hash = header.hash(); let signature = pair.sign(header_hash.as_ref()); let signature_digest_item = as CompatibleDigestItem

>::aura_seal(signature); - let import_block: BlockImportParams = BlockImportParams { + BlockImportParams { origin: BlockOrigin::Own, header, justification: None, @@ -334,27 +263,44 @@ impl SlotWorker for AuraWorker w finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, - }; + } + }) + } - info!("Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", - header_num, - import_block.post_header().hash(), - header_hash - ); - telemetry!(CONSENSUS_INFO; "aura.pre_sealed_block"; - "header_num" => ?header_num, - "hash_now" => ?import_block.post_header().hash(), - "hash_previously" => ?header_hash, - ); + fn force_authoring(&self) -> bool { + self.force_authoring + } - if let Err(e) = block_import.lock().import_block(import_block, Default::default()) { - warn!(target: "aura", "Error with block built on {:?}: {:?}", parent_hash, e); + fn sync_oracle(&mut self) -> &mut Self::SyncOracle { + &mut self.sync_oracle + } - telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on"; - "hash" => ?parent_hash, "err" => ?e, - ); - } - })) + fn proposer(&mut self, block: &B::Header) -> Result { + self.env.init(block).map_err(|e| { + consensus_common::Error::ClientImport(format!("{:?}", e)).into() + }) + } +} + +impl SlotWorker for AuraWorker where + B: BlockT, + C: ProvideRuntimeApi + BlockOf + ProvideCache + Sync + Send, + C::Api: AuraApi>, + E: Environment + Send + Sync, + E::Proposer: Proposer, + >::Create: Unpin + Send + 'static, + H: Header, + I: BlockImport + Send + Sync + 'static, + P: Pair + Send + Sync, + P::Public: Member + Encode + Decode + Hash, + P::Signature: Member + Encode + Decode + Hash + Debug, + SO: SyncOracle + Send + Sync + Clone, + Error: ::std::error::Error + Send + From<::consensus_common::Error> + From + 'static, +{ + type OnSlot = Pin> + Send>>; + + fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot { + >::on_slot(self, chain_head, slot_info) } } @@ -384,7 +330,6 @@ fn find_pre_digest(header: &B::Header) -> Result(BabeParams { C: ProvideRuntimeApi + ProvideCache + ProvideUncles + Send + Sync + 'static, C::Api: BabeApi, SC: SelectChain + 'static, + E: Environment + Send + Sync, E::Proposer: Proposer, >::Create: Unpin + Send + 'static, H: Header, - E: Environment, I: BlockImport + Send + Sync + 'static, Error: std::error::Error + Send + From<::consensus_common::Error> + From + 'static, SO: SyncOracle + Send + Sync + Clone, @@ -229,155 +229,77 @@ struct BabeWorker { keystore: KeyStorePtr, } -impl SlotWorker for BabeWorker where - B: BlockT, +impl slots::SimpleSlotWorker for BabeWorker where + B: BlockT, C: ProvideRuntimeApi + ProvideCache, C::Api: BabeApi, E: Environment, E::Proposer: Proposer, >::Create: Unpin + Send + 'static, - Hash: Debug + Eq + Copy + SimpleBitOps + Encode + Decode + Serialize + - for<'de> Deserialize<'de> + Debug + Default + AsRef<[u8]> + AsMut<[u8]> + - std::hash::Hash + Display + Send + Sync + 'static, H: Header, I: BlockImport + Send + Sync + 'static, SO: SyncOracle + Send + Clone, Error: std::error::Error + Send + From<::consensus_common::Error> + From + 'static, { - type OnSlot = Pin> + Send>>; - - fn on_slot( - &mut self, - chain_head: B::Header, - slot_info: SlotInfo, - ) -> Self::OnSlot { - let ref client = self.client; - let block_import = self.block_import.clone(); - - let (timestamp, slot_number, slot_duration) = - (slot_info.timestamp, slot_info.number, slot_info.duration); - - let epoch = match epoch(client.as_ref(), &BlockId::Hash(chain_head.hash())) { - Ok(authorities) => authorities, - Err(e) => { - error!( - target: "babe", - "Unable to fetch authorities at block {:?}: {:?}", - chain_head.hash(), - e - ); - telemetry!(CONSENSUS_WARN; "babe.unable_fetching_authorities"; - "slot" => ?chain_head.hash(), "err" => ?e - ); - return Box::pin(future::ready(Ok(()))); - } - }; + type EpochData = Epoch; + type Claim = (VRFInOut, VRFProof, u32, AuthorityPair); + type SyncOracle = SO; + type Proposer = E::Proposer; + type BlockImport = I; + + fn logging_target(&self) -> &'static str { + "babe" + } - let Epoch { ref authorities, .. } = epoch; + fn block_import(&self) -> Arc> { + self.block_import.clone() + } - if authorities.is_empty() { - error!(target: "babe", "No authorities at block {:?}", chain_head.hash()); - } + fn epoch_data(&self, block: &B::Hash) -> Result { + epoch(self.client.as_ref(), &BlockId::Hash(*block)) + } - if !self.force_authoring && self.sync_oracle.is_offline() && authorities.len() > 1 { - debug!(target: "babe", "Skipping proposal slot. Waiting for the network."); - telemetry!(CONSENSUS_DEBUG; "babe.skipping_proposal_slot"; - "authorities_len" => authorities.len() - ); - return Box::pin(future::ready(Ok(()))); - } + fn authorities_len(&self, epoch_data: &Self::EpochData) -> usize { + epoch_data.authorities.len() + } - let proposal_work = if let Some(claim) = claim_slot( - slot_info.number, - epoch, + fn claim_slot(&self, slot_number: u64, epoch_data: &Self::EpochData) -> Option { + claim_slot( + slot_number, + epoch_data, self.c, &self.keystore, - ) { - let ((inout, vrf_proof, _batchable_proof), authority_index, key) = claim; - - debug!( - target: "babe", "Starting authorship at slot {}; timestamp = {}", - slot_number, - timestamp, - ); - telemetry!(CONSENSUS_DEBUG; "babe.starting_authorship"; - "slot_number" => slot_number, "timestamp" => timestamp - ); - - // we are the slot author. make a block and sign it. - let mut proposer = match self.env.init(&chain_head) { - Ok(p) => p, - Err(e) => { - warn!(target: "babe", - "Unable to author block in slot {:?}: {:?}", - slot_number, - e, - ); - telemetry!(CONSENSUS_WARN; "babe.unable_authoring_block"; - "slot" => slot_number, "err" => ?e - ); - return Box::pin(future::ready(Ok(()))) - } - }; - - let inherent_digest = BabePreDigest { - vrf_proof, - vrf_output: inout.to_output(), - authority_index: authority_index as u32, - slot_number, - }; + ).map(|((inout, vrf_proof, _), authority_index, key)| { + (inout, vrf_proof, authority_index as u32, key) + }) + } - // deadline our production to approx. the end of the slot - let remaining_duration = slot_info.remaining_duration(); - futures::future::select( - proposer.propose( - slot_info.inherent_data, - generic::Digest { - logs: vec![ - generic::DigestItem::babe_pre_digest(inherent_digest.clone()), - ], - }, - remaining_duration, - ).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into()), - Delay::new(remaining_duration) - .map_err(|err| consensus_common::Error::FaultyTimer(err).into()) - ).map(|v| match v { - futures::future::Either::Left((v, _)) => v.map(|v| (v, key)), - futures::future::Either::Right((Ok(_), _)) => - Err(consensus_common::Error::ClientImport("Timeout in the BaBe proposer".into())), - futures::future::Either::Right((Err(err), _)) => Err(err), - }) - } else { - return Box::pin(future::ready(Ok(()))); + fn pre_digest_data(&self, slot_number: u64, claim: &Self::Claim) -> Vec> { + let inherent_digest = BabePreDigest { + vrf_proof: claim.1.clone(), + vrf_output: claim.0.to_output(), + authority_index: claim.2, + slot_number, }; - Box::pin(proposal_work.map_ok(move |(b, key)| { - // minor hack since we don't have access to the timestamp - // that is actually set by the proposer. - let slot_after_building = SignedDuration::default().slot_now(slot_duration); - if slot_after_building != slot_number { - info!( - target: "babe", - "Discarding proposal for slot {}; block production took too long", - slot_number - ); - telemetry!(CONSENSUS_INFO; "babe.discarding_proposal_took_too_long"; - "slot" => slot_number - ); - return; - } - - let (header, body) = b.deconstruct(); - let header_num = header.number().clone(); - let parent_hash = header.parent_hash().clone(); + vec![ + as CompatibleDigestItem>::babe_pre_digest(inherent_digest), + ] + } + fn import_block(&self) -> Box, + Self::Claim, + ) -> consensus_common::BlockImportParams + Send> { + Box::new(|header, header_hash, body, (_, _, _, pair)| { // sign the pre-sealed hash of the block and then // add it to a digest item. - let header_hash = header.hash(); - let signature = key.sign(header_hash.as_ref()); - let signature_digest_item = DigestItemFor::::babe_seal(signature); + let signature = pair.sign(header_hash.as_ref()); + let signature_digest_item = as CompatibleDigestItem>::babe_seal(signature); - let import_block = BlockImportParams:: { + BlockImportParams { origin: BlockOrigin::Own, header, justification: None, @@ -386,29 +308,41 @@ impl SlotWorker for BabeWorker w finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, - }; + } + }) + } - info!(target: "babe", - "Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", - header_num, - import_block.post_header().hash(), - header_hash, - ); + fn force_authoring(&self) -> bool { + self.force_authoring + } - telemetry!(CONSENSUS_INFO; "babe.pre_sealed_block"; - "header_num" => ?header_num, - "hash_now" => ?import_block.post_header().hash(), - "hash_previously" => ?header_hash, - ); + fn sync_oracle(&mut self) -> &mut Self::SyncOracle { + &mut self.sync_oracle + } - if let Err(e) = block_import.lock().import_block(import_block, Default::default()) { - warn!(target: "babe", "Error with block built on {:?}: {:?}", - parent_hash, e); - telemetry!(CONSENSUS_WARN; "babe.err_with_block_built_on"; - "hash" => ?parent_hash, "err" => ?e - ); - } - })) + fn proposer(&mut self, block: &B::Header) -> Result { + self.env.init(block).map_err(|e| { + consensus_common::Error::ClientImport(format!("{:?}", e)).into() + }) + } +} + +impl SlotWorker for BabeWorker where + B: BlockT, + C: ProvideRuntimeApi + ProvideCache + Send + Sync, + C::Api: BabeApi, + E: Environment + Send + Sync, + E::Proposer: Proposer, + >::Create: Unpin + Send + 'static, + H: Header, + I: BlockImport + Send + Sync + 'static, + SO: SyncOracle + Send + Sync + Clone, + Error: std::error::Error + Send + From<::consensus_common::Error> + From + 'static, +{ + type OnSlot = Pin> + Send>>; + + fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot { + >::on_slot(self, chain_head, slot_info) } } @@ -825,7 +759,7 @@ fn calculate_threshold( /// so it returns `Some(_)`. Otherwise, it returns `None`. fn claim_slot( slot_number: u64, - Epoch { ref authorities, ref randomness, epoch_index, .. }: Epoch, + Epoch { authorities, randomness, epoch_index, .. }: &Epoch, c: (u64, u64), keystore: &KeyStorePtr, ) -> Option<((VRFInOut, VRFProof, VRFProofBatchable), usize, AuthorityPair)> { @@ -835,7 +769,7 @@ fn claim_slot( .find_map(|(i, a)| { keystore.key_pair::(&a.0).ok().map(|kp| (kp, i)) })?; - let transcript = make_transcript(randomness, slot_number, epoch_index); + let transcript = make_transcript(randomness, slot_number, *epoch_index); // Compute the threshold we will use. // @@ -1219,7 +1153,7 @@ pub mod test_helpers { super::claim_slot( slot_number, - epoch, + &epoch, c, keystore, ).map(|((inout, vrf_proof, _), authority_index, _)| { diff --git a/core/consensus/babe/src/tests.rs b/core/consensus/babe/src/tests.rs index eaa4fbe099914..af60ec3d6b4c2 100644 --- a/core/consensus/babe/src/tests.rs +++ b/core/consensus/babe/src/tests.rs @@ -324,7 +324,7 @@ fn can_author_block() { duration: 100, }; loop { - match claim_slot(i, epoch.clone(), (3, 10), &keystore) { + match claim_slot(i, &epoch.clone(), (3, 10), &keystore) { None => i += 1, Some(s) => { debug!(target: "babe", "Authored block {:?}", s.0); diff --git a/core/consensus/slots/Cargo.toml b/core/consensus/slots/Cargo.toml index f74837a62f9de..4203da9d7d7f3 100644 --- a/core/consensus/slots/Cargo.toml +++ b/core/consensus/slots/Cargo.toml @@ -10,6 +10,7 @@ codec = { package = "parity-scale-codec", version = "1.0.0" } client = { package = "substrate-client", path = "../../client" } primitives = { package = "substrate-primitives", path = "../../primitives" } sr-primitives = { path = "../../sr-primitives" } +substrate-telemetry = { path = "../../telemetry" } consensus_common = { package = "substrate-consensus-common", path = "../common" } inherents = { package = "substrate-inherents", path = "../../inherents" } futures-preview = "=0.3.0-alpha.17" diff --git a/core/consensus/slots/src/lib.rs b/core/consensus/slots/src/lib.rs index fc0134f746af7..d28510c8c60f9 100644 --- a/core/consensus/slots/src/lib.rs +++ b/core/consensus/slots/src/lib.rs @@ -31,13 +31,16 @@ use slots::Slots; pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND}; use codec::{Decode, Encode}; -use consensus_common::{SyncOracle, SelectChain}; +use consensus_common::{BlockImport, Proposer, SyncOracle, SelectChain}; use futures::{prelude::*, future::{self, Either}}; +use futures_timer::Delay; use inherents::{InherentData, InherentDataProviders}; use log::{debug, error, info, warn}; use sr_primitives::generic::BlockId; -use sr_primitives::traits::{ApiRef, Block as BlockT, ProvideRuntimeApi}; -use std::{fmt::Debug, ops::Deref}; +use sr_primitives::traits::{ApiRef, Block as BlockT, Header, ProvideRuntimeApi}; +use std::{fmt::Debug, ops::Deref, pin::Pin, sync::Arc}; +use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_WARN, CONSENSUS_INFO}; +use parking_lot::Mutex; /// A worker that should be invoked at every new slot. pub trait SlotWorker { @@ -49,6 +52,203 @@ pub trait SlotWorker { fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot; } +/// A skeleton implementation for `SlotWorker` which tries to claim a slot at at +/// its beginning and tries to produce a block if successfully claimed, timing +/// out if block production takes too long. +pub trait SimpleSlotWorker { + /// A handle to a `BlockImport`. + type BlockImport: BlockImport + Send + 'static; + + /// A handle to a `SyncOracle`. + type SyncOracle: SyncOracle; + + /// The type of proposer to use to build blocks. + type Proposer: Proposer; + + /// Data associated with a slot claim. + type Claim: Send + 'static; + + /// Epoch data necessary for authoring. + type EpochData; + + /// The logging target to use when logging messages. + fn logging_target(&self) -> &'static str; + + /// A handle to a `BlockImport`. + fn block_import(&self) -> Arc>; + + /// Returns the epoch data necessary for authoring. + fn epoch_data(&self, block: &B::Hash) -> Result; + + /// Returns the number of authorities given the epoch data. + fn authorities_len(&self, epoch_data: &Self::EpochData) -> usize; + + /// Tries to claim the given slot, returning an object with claim data if successful. + fn claim_slot(&self, slot_number: u64, epoch_data: &Self::EpochData) -> Option; + + /// Return the pre digest data to include in a block authored with the given claim. + fn pre_digest_data(&self, slot_number: u64, claim: &Self::Claim) -> Vec>; + + /// Returns a function which produces a `BlockImportParams`. + fn import_block(&self) -> Box, + Self::Claim, + ) -> consensus_common::BlockImportParams + Send>; + + /// Whether to force authoring if offline. + fn force_authoring(&self) -> bool; + + /// Returns a handle to a `SyncOracle`. + fn sync_oracle(&mut self) -> &mut Self::SyncOracle; + + /// Returns a `Proposer` to author on top of the given block. + fn proposer(&mut self, block: &B::Header) -> Result; + + /// Implements the `on_slot` functionality from `SlotWorker`. + fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) + -> Pin> + Send>> where + Self: Send + Sync, + >::Create: Unpin + Send + 'static, + { + let (timestamp, slot_number, slot_duration) = + (slot_info.timestamp, slot_info.number, slot_info.duration); + + let epoch_data = match self.epoch_data(&chain_head.hash()) { + Ok(epoch_data) => epoch_data, + Err(err) => { + warn!("Unable to fetch epoch data at block {:?}: {:?}", chain_head.hash(), err); + + telemetry!( + CONSENSUS_WARN; "slots.unable_fetching_authorities"; + "slot" => ?chain_head.hash(), + "err" => ?err, + ); + + return Box::pin(future::ready(Ok(()))); + } + }; + + let authorities_len = self.authorities_len(&epoch_data); + + if !self.force_authoring() && self.sync_oracle().is_offline() && authorities_len > 1 { + debug!(target: self.logging_target(), "Skipping proposal slot. Waiting for the network."); + telemetry!( + CONSENSUS_DEBUG; + "slots.skipping_proposal_slot"; + "authorities_len" => authorities_len, + ); + + return Box::pin(future::ready(Ok(()))); + } + + let claim = match self.claim_slot(slot_number, &epoch_data) { + None => return Box::pin(future::ready(Ok(()))), + Some(claim) => claim, + }; + + debug!( + target: self.logging_target(), "Starting authorship at slot {}; timestamp = {}", + slot_number, + timestamp, + ); + + telemetry!(CONSENSUS_DEBUG; "slots.starting_authorship"; + "slot_num" => slot_number, + "timestamp" => timestamp, + ); + + let mut proposer = match self.proposer(&chain_head) { + Ok(proposer) => proposer, + Err(err) => { + warn!("Unable to author block in slot {:?}: {:?}", slot_number, err); + + telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block"; + "slot" => slot_number, "err" => ?err + ); + + return Box::pin(future::ready(Ok(()))); + }, + }; + + let remaining_duration = slot_info.remaining_duration(); + let logs = self.pre_digest_data(slot_number, &claim); + + // deadline our production to approx. the end of the slot + let proposal_work = futures::future::select( + proposer.propose( + slot_info.inherent_data, + sr_primitives::generic::Digest { + logs, + }, + remaining_duration, + ).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into()), + Delay::new(remaining_duration) + .map_err(|err| consensus_common::Error::FaultyTimer(err).into()) + ).map(|v| match v { + futures::future::Either::Left((b, _)) => b.map(|b| (b, claim)), + futures::future::Either::Right((Ok(_), _)) => + Err(consensus_common::Error::ClientImport("Timeout in the Slots proposer".into())), + futures::future::Either::Right((Err(err), _)) => Err(err), + }); + + let import_block = self.import_block(); + let block_import = self.block_import(); + let logging_target = self.logging_target(); + + Box::pin(proposal_work.map_ok(move |(block, claim)| { + // minor hack since we don't have access to the timestamp + // that is actually set by the proposer. + let slot_after_building = SignedDuration::default().slot_now(slot_duration); + if slot_after_building != slot_number { + info!("Discarding proposal for slot {}; block production took too long", slot_number); + telemetry!(CONSENSUS_INFO; "slots.discarding_proposal_took_too_long"; + "slot" => slot_number, + ); + + return; + } + + let (header, body) = block.deconstruct(); + let header_num = header.number().clone(); + let header_hash = header.hash(); + let parent_hash = header.parent_hash().clone(); + + let import_block = import_block( + header, + &header_hash, + body, + claim, + ); + + info!("Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", + header_num, + import_block.post_header().hash(), + header_hash, + ); + + telemetry!(CONSENSUS_INFO; "slots.pre_sealed_block"; + "header_num" => ?header_num, + "hash_now" => ?import_block.post_header().hash(), + "hash_previously" => ?header_hash, + ); + + if let Err(err) = block_import.lock().import_block(import_block, Default::default()) { + warn!(target: logging_target, + "Error with block built on {:?}: {:?}", + parent_hash, + err, + ); + + telemetry!(CONSENSUS_WARN; "slots.err_with_block_built_on"; + "hash" => ?parent_hash, "err" => ?err, + ); + } + })) + } +} + /// Slot compatible inherent data. pub trait SlotCompatible { /// Extract timestamp and slot from inherent data.