Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 3755819

Browse files
andresilvabkchr
authored andcommitted
consensus: refactor aura and babe proposer (#3377)
1 parent 36d7a4f commit 3755819

File tree

6 files changed

+370
-289
lines changed

6 files changed

+370
-289
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/consensus/aura/src/lib.rs

Lines changed: 80 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -145,13 +145,13 @@ pub fn start_aura<B, C, SC, E, I, P, SO, Error, H>(
145145
C: ProvideRuntimeApi + BlockOf + ProvideCache<B> + AuxStore + Send + Sync,
146146
C::Api: AuraApi<B, AuthorityId<P>>,
147147
SC: SelectChain<B>,
148+
E: Environment<B, Error=Error> + Send + Sync + 'static,
148149
E::Proposer: Proposer<B, Error=Error>,
149-
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
150-
P: Pair + Send + Sync + 'static,
150+
<E::Proposer as Proposer<B>>::Create: Unpin + Send,
151+
P: Pair + Send + Sync,
151152
P::Public: Hash + Member + Encode + Decode,
152153
P::Signature: Hash + Member + Encode + Decode,
153154
H: Header<Hash=B::Hash>,
154-
E: Environment<B, Error=Error>,
155155
I: BlockImport<B> + Send + Sync + 'static,
156156
Error: ::std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
157157
SO: SyncOracle + Send + Sync + Clone,
@@ -189,143 +189,72 @@ struct AuraWorker<C, E, I, P, SO> {
189189
_key_type: PhantomData<P>,
190190
}
191191

192-
impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> where
192+
impl<H, B, C, E, I, P, Error, SO> slots::SimpleSlotWorker<B> for AuraWorker<C, E, I, P, SO> where
193193
B: BlockT<Header=H>,
194194
C: ProvideRuntimeApi + BlockOf + ProvideCache<B> + Sync,
195195
C::Api: AuraApi<B, AuthorityId<P>>,
196196
E: Environment<B, Error=Error>,
197197
E::Proposer: Proposer<B, Error=Error>,
198-
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
198+
<E::Proposer as Proposer<B>>::Create: Unpin + Send,
199199
H: Header<Hash=B::Hash>,
200200
I: BlockImport<B> + Send + Sync + 'static,
201-
P: Pair + Send + Sync + 'static,
201+
P: Pair + Send + Sync,
202202
P::Public: Member + Encode + Decode + Hash,
203203
P::Signature: Member + Encode + Decode + Hash + Debug,
204204
SO: SyncOracle + Send + Clone,
205205
Error: ::std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
206206
{
207-
type OnSlot = Pin<Box<dyn Future<Output = Result<(), consensus_common::Error>> + Send>>;
208-
209-
fn on_slot(
210-
&mut self,
211-
chain_head: B::Header,
212-
slot_info: SlotInfo,
213-
) -> Self::OnSlot {
214-
let client = self.client.clone();
215-
let block_import = self.block_import.clone();
216-
217-
let (timestamp, slot_num, slot_duration) =
218-
(slot_info.timestamp, slot_info.number, slot_info.duration);
219-
220-
let authorities = match authorities(client.as_ref(), &BlockId::Hash(chain_head.hash())) {
221-
Ok(authorities) => authorities,
222-
Err(e) => {
223-
warn!("Unable to fetch authorities at block {:?}: {:?}", chain_head.hash(), e);
224-
225-
telemetry!(
226-
CONSENSUS_WARN; "aura.unable_fetching_authorities";
227-
"slot" => ?chain_head.hash(),
228-
"err" => ?e,
229-
);
230-
return Box::pin(future::ready(Ok(())));
231-
}
232-
};
207+
type EpochData = Vec<AuthorityId<P>>;
208+
type Claim = P;
209+
type SyncOracle = SO;
210+
type Proposer = E::Proposer;
211+
type BlockImport = I;
212+
213+
fn logging_target(&self) -> &'static str {
214+
"aura"
215+
}
233216

234-
if !self.force_authoring && self.sync_oracle.is_offline() && authorities.len() > 1 {
235-
debug!(target: "aura", "Skipping proposal slot. Waiting for the network.");
236-
telemetry!(
237-
CONSENSUS_DEBUG;
238-
"aura.skipping_proposal_slot";
239-
"authorities_len" => authorities.len(),
240-
);
241-
return Box::pin(future::ready(Ok(())));
242-
}
243-
let maybe_author = slot_author::<P>(slot_num, &authorities);
244-
let maybe_pair = maybe_author.and_then(|p|
245-
self.keystore.as_ref().and_then(|k|
246-
k.read().key_pair_by_type::<P>(&p, app_crypto::key_types::AURA).ok()
247-
)
248-
);
249-
let proposal_work = match maybe_pair {
250-
None => return Box::pin(future::ready(Ok(()))),
251-
Some(pair) => {
252-
debug!(
253-
target: "aura", "Starting authorship at slot {}; timestamp = {}",
254-
slot_num,
255-
timestamp,
256-
);
257-
telemetry!(CONSENSUS_DEBUG; "aura.starting_authorship";
258-
"slot_num" => slot_num,
259-
"timestamp" => timestamp,
260-
);
217+
fn block_import(&self) -> Arc<Mutex<Self::BlockImport>> {
218+
self.block_import.clone()
219+
}
261220

262-
// we are the slot author. make a block and sign it.
263-
let mut proposer = match self.env.init(&chain_head) {
264-
Ok(p) => p,
265-
Err(e) => {
266-
warn!("Unable to author block in slot {:?}: {:?}", slot_num, e);
267-
telemetry!(CONSENSUS_WARN; "aura.unable_authoring_block";
268-
"slot" => slot_num, "err" => ?e
269-
);
270-
return Box::pin(future::ready(Ok(())))
271-
}
272-
};
221+
fn epoch_data(&self, block: &B::Hash) -> Result<Self::EpochData, consensus_common::Error> {
222+
authorities(self.client.as_ref(), &BlockId::Hash(*block))
223+
}
273224

274-
let remaining_duration = slot_info.remaining_duration();
275-
// deadline our production to approx. the end of the
276-
// slot
277-
futures::future::select(
278-
proposer.propose(
279-
slot_info.inherent_data,
280-
generic::Digest {
281-
logs: vec![
282-
<DigestItemFor<B> as CompatibleDigestItem<P>>::aura_pre_digest(slot_num),
283-
],
284-
},
285-
remaining_duration,
286-
).map_err(|e| consensus_common::Error::ClientImport(format!("{:?}", e)).into()),
287-
Delay::new(remaining_duration)
288-
.map_err(|err| consensus_common::Error::FaultyTimer(err).into())
289-
).map(|v| match v {
290-
futures::future::Either::Left((v, _)) => v.map(|v| (v, pair)),
291-
futures::future::Either::Right((Ok(_), _)) =>
292-
Err(consensus_common::Error::ClientImport("Timeout in the AuRa proposer".into())),
293-
futures::future::Either::Right((Err(err), _)) => Err(err),
294-
})
295-
}
296-
};
225+
fn authorities_len(&self, epoch_data: &Self::EpochData) -> usize {
226+
epoch_data.len()
227+
}
297228

298-
Box::pin(proposal_work.map_ok(move |(b, pair)| {
299-
// minor hack since we don't have access to the timestamp
300-
// that is actually set by the proposer.
301-
let slot_after_building = SignedDuration::default().slot_now(slot_duration);
302-
if slot_after_building != slot_num {
303-
info!("Discarding proposal for slot {}; block production took too long", slot_num);
304-
telemetry!(CONSENSUS_INFO; "aura.discarding_proposal_took_too_long";
305-
"slot" => slot_num,
306-
);
307-
return
308-
}
229+
fn claim_slot(&self, slot_number: u64, epoch_data: &Self::EpochData) -> Option<Self::Claim> {
230+
let expected_author = slot_author::<P>(slot_number, epoch_data);
309231

310-
let (header, body) = b.deconstruct();
311-
let pre_digest: Result<u64, String> = find_pre_digest::<B, P>(&header);
312-
if let Err(e) = pre_digest {
313-
error!(target: "aura", "FATAL ERROR: Invalid pre-digest: {}!", e);
314-
return
315-
} else {
316-
trace!(target: "aura", "Got correct number of seals. Good!")
317-
};
232+
expected_author.and_then(|p| {
233+
self.keystore.as_ref().and_then(|k| {
234+
k.read().key_pair_by_type::<P>(&p, app_crypto::key_types::AURA).ok()
235+
})
236+
})
237+
}
318238

319-
let header_num = header.number().clone();
320-
let parent_hash = header.parent_hash().clone();
239+
fn pre_digest_data(&self, slot_number: u64, _claim: &Self::Claim) -> Vec<sr_primitives::DigestItem<B::Hash>> {
240+
vec![
241+
<DigestItemFor<B> as CompatibleDigestItem<P>>::aura_pre_digest(slot_number),
242+
]
243+
}
321244

245+
fn import_block(&self) -> Box<dyn Fn(
246+
B::Header,
247+
&B::Hash,
248+
Vec<B::Extrinsic>,
249+
Self::Claim,
250+
) -> consensus_common::BlockImportParams<B> + Send> {
251+
Box::new(|header, header_hash, body, pair| {
322252
// sign the pre-sealed hash of the block and then
323253
// add it to a digest item.
324-
let header_hash = header.hash();
325254
let signature = pair.sign(header_hash.as_ref());
326255
let signature_digest_item = <DigestItemFor<B> as CompatibleDigestItem<P>>::aura_seal(signature);
327256

328-
let import_block: BlockImportParams<B> = BlockImportParams {
257+
BlockImportParams {
329258
origin: BlockOrigin::Own,
330259
header,
331260
justification: None,
@@ -334,27 +263,44 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w
334263
finalized: false,
335264
auxiliary: Vec::new(),
336265
fork_choice: ForkChoiceStrategy::LongestChain,
337-
};
266+
}
267+
})
268+
}
338269

339-
info!("Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
340-
header_num,
341-
import_block.post_header().hash(),
342-
header_hash
343-
);
344-
telemetry!(CONSENSUS_INFO; "aura.pre_sealed_block";
345-
"header_num" => ?header_num,
346-
"hash_now" => ?import_block.post_header().hash(),
347-
"hash_previously" => ?header_hash,
348-
);
270+
fn force_authoring(&self) -> bool {
271+
self.force_authoring
272+
}
349273

350-
if let Err(e) = block_import.lock().import_block(import_block, Default::default()) {
351-
warn!(target: "aura", "Error with block built on {:?}: {:?}", parent_hash, e);
274+
fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
275+
&mut self.sync_oracle
276+
}
352277

353-
telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on";
354-
"hash" => ?parent_hash, "err" => ?e,
355-
);
356-
}
357-
}))
278+
fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, consensus_common::Error> {
279+
self.env.init(block).map_err(|e| {
280+
consensus_common::Error::ClientImport(format!("{:?}", e)).into()
281+
})
282+
}
283+
}
284+
285+
impl<H, B: BlockT, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> where
286+
B: BlockT<Header=H>,
287+
C: ProvideRuntimeApi + BlockOf + ProvideCache<B> + Sync + Send,
288+
C::Api: AuraApi<B, AuthorityId<P>>,
289+
E: Environment<B, Error=Error> + Send + Sync,
290+
E::Proposer: Proposer<B, Error=Error>,
291+
<E::Proposer as Proposer<B>>::Create: Unpin + Send + 'static,
292+
H: Header<Hash=B::Hash>,
293+
I: BlockImport<B> + Send + Sync + 'static,
294+
P: Pair + Send + Sync,
295+
P::Public: Member + Encode + Decode + Hash,
296+
P::Signature: Member + Encode + Decode + Hash + Debug,
297+
SO: SyncOracle + Send + Sync + Clone,
298+
Error: ::std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
299+
{
300+
type OnSlot = Pin<Box<dyn Future<Output = Result<(), consensus_common::Error>> + Send>>;
301+
302+
fn on_slot(&mut self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot {
303+
<Self as slots::SimpleSlotWorker<B>>::on_slot(self, chain_head, slot_info)
358304
}
359305
}
360306

@@ -384,7 +330,6 @@ fn find_pre_digest<B: BlockT, P: Pair>(header: &B::Header) -> Result<u64, String
384330
pre_digest.ok_or_else(|| aura_err!("No AuRa pre-runtime digest found"))
385331
}
386332

387-
388333
/// check a header has been signed by the right key. If the slot is too far in the future, an error will be returned.
389334
/// if it's successful, returns the pre-header and the digest item containing the seal.
390335
///

0 commit comments

Comments
 (0)