Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

215 changes: 80 additions & 135 deletions core/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ pub fn start_aura<B, C, SC, E, I, P, SO, Error, H>(
C: ProvideRuntimeApi + BlockOf + ProvideCache<B> + AuxStore + Send + Sync,
C::Api: AuraApi<B, AuthorityId<P>>,
SC: SelectChain<B>,
E: Environment<B, Error=Error> + Send + Sync + 'static,
E::Proposer: Proposer<B, Error=Error>,
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
P: Pair + Send + Sync + 'static,
<E::Proposer as Proposer<B>>::Create: Unpin + Send,
P: Pair + Send + Sync,
P::Public: Hash + Member + Encode + Decode,
P::Signature: Hash + Member + Encode + Decode,
H: Header<Hash=B::Hash>,
E: Environment<B, Error=Error>,
I: BlockImport<B> + Send + Sync + 'static,
Error: ::std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone,
Expand Down Expand Up @@ -189,143 +189,72 @@ struct AuraWorker<C, E, I, P, SO> {
_key_type: PhantomData<P>,
}

impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> where
impl<H, B, C, E, I, P, Error, SO> slots::SimpleSlotWorker<B> for AuraWorker<C, E, I, P, SO> where
B: BlockT<Header=H>,
C: ProvideRuntimeApi + BlockOf + ProvideCache<B> + Sync,
C::Api: AuraApi<B, AuthorityId<P>>,
E: Environment<B, Error=Error>,
E::Proposer: Proposer<B, Error=Error>,
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
<E::Proposer as Proposer<B>>::Create: Unpin + Send,
H: Header<Hash=B::Hash>,
I: BlockImport<B> + Send + Sync + 'static,
P: Pair + Send + Sync + 'static,
P: Pair + Send + Sync,
P::Public: Member + Encode + Decode + Hash,
P::Signature: Member + Encode + Decode + Hash + Debug,
SO: SyncOracle + Send + Clone,
Error: ::std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
{
type OnSlot = Pin<Box<dyn Future<Output = Result<(), consensus_common::Error>> + Send>>;

fn on_slot(
&mut self,
chain_head: B::Header,
slot_info: SlotInfo,
) -> Self::OnSlot {
let client = self.client.clone();
let block_import = self.block_import.clone();

let (timestamp, slot_num, slot_duration) =
(slot_info.timestamp, slot_info.number, slot_info.duration);

let authorities = match authorities(client.as_ref(), &BlockId::Hash(chain_head.hash())) {
Ok(authorities) => authorities,
Err(e) => {
warn!("Unable to fetch authorities at block {:?}: {:?}", chain_head.hash(), e);

telemetry!(
CONSENSUS_WARN; "aura.unable_fetching_authorities";
"slot" => ?chain_head.hash(),
"err" => ?e,
);
return Box::pin(future::ready(Ok(())));
}
};
type EpochData = Vec<AuthorityId<P>>;
type Claim = P;
type SyncOracle = SO;
type Proposer = E::Proposer;
type BlockImport = I;

fn logging_target(&self) -> &'static str {
"aura"
}

if !self.force_authoring && self.sync_oracle.is_offline() && authorities.len() > 1 {
debug!(target: "aura", "Skipping proposal slot. Waiting for the network.");
telemetry!(
CONSENSUS_DEBUG;
"aura.skipping_proposal_slot";
"authorities_len" => authorities.len(),
);
return Box::pin(future::ready(Ok(())));
}
let maybe_author = slot_author::<P>(slot_num, &authorities);
let maybe_pair = maybe_author.and_then(|p|
self.keystore.as_ref().and_then(|k|
k.read().key_pair_by_type::<P>(&p, app_crypto::key_types::AURA).ok()
)
);
let proposal_work = match maybe_pair {
None => return Box::pin(future::ready(Ok(()))),
Some(pair) => {
debug!(
target: "aura", "Starting authorship at slot {}; timestamp = {}",
slot_num,
timestamp,
);
telemetry!(CONSENSUS_DEBUG; "aura.starting_authorship";
"slot_num" => slot_num,
"timestamp" => timestamp,
);
fn block_import(&self) -> Arc<Mutex<Self::BlockImport>> {
self.block_import.clone()
}

// we are the slot author. make a block and sign it.
let mut proposer = match self.env.init(&chain_head) {
Ok(p) => p,
Err(e) => {
warn!("Unable to author block in slot {:?}: {:?}", slot_num, e);
telemetry!(CONSENSUS_WARN; "aura.unable_authoring_block";
"slot" => slot_num, "err" => ?e
);
return Box::pin(future::ready(Ok(())))
}
};
fn epoch_data(&self, block: &B::Hash) -> Result<Self::EpochData, consensus_common::Error> {
authorities(self.client.as_ref(), &BlockId::Hash(*block))
}

let remaining_duration = slot_info.remaining_duration();
// deadline our production to approx. the end of the
// slot
futures::future::select(
proposer.propose(
slot_info.inherent_data,
generic::Digest {
logs: vec![
<DigestItemFor<B> as CompatibleDigestItem<P>>::aura_pre_digest(slot_num),
],
},
remaining_duration,
).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into()),
Delay::new(remaining_duration)
.map_err(|err| consensus_common::Error::FaultyTimer(err).into())
).map(|v| match v {
futures::future::Either::Left((v, _)) => v.map(|v| (v, pair)),
futures::future::Either::Right((Ok(_), _)) =>
Err(consensus_common::Error::ClientImport("Timeout in the AuRa proposer".into())),
futures::future::Either::Right((Err(err), _)) => Err(err),
})
}
};
fn authorities_len(&self, epoch_data: &Self::EpochData) -> usize {
epoch_data.len()
}

Box::pin(proposal_work.map_ok(move |(b, pair)| {
// minor hack since we don't have access to the timestamp
// that is actually set by the proposer.
let slot_after_building = SignedDuration::default().slot_now(slot_duration);
if slot_after_building != slot_num {
info!("Discarding proposal for slot {}; block production took too long", slot_num);
telemetry!(CONSENSUS_INFO; "aura.discarding_proposal_took_too_long";
"slot" => slot_num,
);
return
}
fn claim_slot(&self, slot_number: u64, epoch_data: &Self::EpochData) -> Option<Self::Claim> {
let expected_author = slot_author::<P>(slot_number, epoch_data);

let (header, body) = b.deconstruct();
let pre_digest: Result<u64, String> = find_pre_digest::<B, P>(&header);
if let Err(e) = pre_digest {
error!(target: "aura", "FATAL ERROR: Invalid pre-digest: {}!", e);
return
} else {
trace!(target: "aura", "Got correct number of seals. Good!")
};
expected_author.and_then(|p| {
self.keystore.as_ref().and_then(|k| {
k.read().key_pair_by_type::<P>(&p, app_crypto::key_types::AURA).ok()
})
})
}

let header_num = header.number().clone();
let parent_hash = header.parent_hash().clone();
fn pre_digest_data(&self, slot_number: u64, _claim: &Self::Claim) -> Vec<sr_primitives::DigestItem<B::Hash>> {
vec![
<DigestItemFor<B> as CompatibleDigestItem<P>>::aura_pre_digest(slot_number),
]
}

fn import_block(&self) -> Box<dyn Fn(
B::Header,
&B::Hash,
Vec<B::Extrinsic>,
Self::Claim,
) -> consensus_common::BlockImportParams<B> + Send> {
Box::new(|header, header_hash, body, pair| {
// sign the pre-sealed hash of the block and then
// add it to a digest item.
let header_hash = header.hash();
let signature = pair.sign(header_hash.as_ref());
let signature_digest_item = <DigestItemFor<B> as CompatibleDigestItem<P>>::aura_seal(signature);

let import_block: BlockImportParams<B> = BlockImportParams {
BlockImportParams {
origin: BlockOrigin::Own,
header,
justification: None,
Expand All @@ -334,27 +263,44 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w
finalized: false,
auxiliary: Vec::new(),
fork_choice: ForkChoiceStrategy::LongestChain,
};
}
})
}

info!("Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
header_num,
import_block.post_header().hash(),
header_hash
);
telemetry!(CONSENSUS_INFO; "aura.pre_sealed_block";
"header_num" => ?header_num,
"hash_now" => ?import_block.post_header().hash(),
"hash_previously" => ?header_hash,
);
fn force_authoring(&self) -> bool {
self.force_authoring
}

if let Err(e) = block_import.lock().import_block(import_block, Default::default()) {
warn!(target: "aura", "Error with block built on {:?}: {:?}", parent_hash, e);
fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
&mut self.sync_oracle
}

telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on";
"hash" => ?parent_hash, "err" => ?e,
);
}
}))
fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, consensus_common::Error> {
self.env.init(block).map_err(|e| {
consensus_common::Error::ClientImport(format!("{:?}", e)).into()
})
}
}

impl<H, B: BlockT, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> where
B: BlockT<Header=H>,
C: ProvideRuntimeApi + BlockOf + ProvideCache<B> + Sync + Send,
C::Api: AuraApi<B, AuthorityId<P>>,
E: Environment<B, Error=Error> + Send + Sync,
E::Proposer: Proposer<B, Error=Error>,
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
H: Header<Hash=B::Hash>,
I: BlockImport<B> + Send + Sync + 'static,
P: Pair + Send + Sync,
P::Public: Member + Encode + Decode + Hash,
P::Signature: Member + Encode + Decode + Hash + Debug,
SO: SyncOracle + Send + Sync + Clone,
Error: ::std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
{
type OnSlot = Pin<Box<dyn Future<Output = Result<(), consensus_common::Error>> + Send>>;

fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot {
<Self as slots::SimpleSlotWorker<B>>::on_slot(self, chain_head, slot_info)
}
}

Expand Down Expand Up @@ -384,7 +330,6 @@ fn find_pre_digest<B: BlockT, P: Pair>(header: &B::Header) -> Result<u64, String
pre_digest.ok_or_else(|| aura_err!("No AuRa pre-runtime digest found"))
}


/// check a header has been signed by the right key. If the slot is too far in the future, an error will be returned.
/// if it's successful, returns the pre-header and the digest item containing the seal.
///
Expand Down
Loading