@@ -25,11 +25,10 @@ use std::{
2525} ;
2626
2727use codec:: { Codec , Decode , Encode } ;
28- use futures:: { future , FutureExt , StreamExt } ;
28+ use futures:: StreamExt ;
2929use log:: { debug, error, info, log_enabled, trace, warn} ;
30- use parking_lot:: Mutex ;
3130
32- use sc_client_api:: { Backend , FinalityNotification , FinalityNotifications } ;
31+ use sc_client_api:: { Backend , FinalityNotification } ;
3332use sc_network_gossip:: GossipEngine ;
3433
3534use sp_api:: { BlockId , ProvideRuntimeApi } ;
@@ -80,15 +79,14 @@ pub(crate) struct BeefyWorker<B: Block, BE, C, R, SO> {
8079 runtime : Arc < R > ,
8180 key_store : BeefyKeystore ,
8281 signed_commitment_sender : BeefySignedCommitmentSender < B > ,
83- gossip_engine : Arc < Mutex < GossipEngine < B > > > ,
82+ gossip_engine : GossipEngine < B > ,
8483 gossip_validator : Arc < GossipValidator < B > > ,
8584 /// Min delta in block numbers between two blocks, BEEFY should vote on
8685 min_block_delta : u32 ,
8786 metrics : Option < Metrics > ,
8887 rounds : Option < Rounds < Payload , B > > ,
8988 /// Buffer holding votes for blocks that the client hasn't seen finality for.
9089 pending_votes : BTreeMap < NumberFor < B > , Vec < VoteMessage < NumberFor < B > , AuthorityId , Signature > > > ,
91- finality_notifications : FinalityNotifications < B > ,
9290 /// Best block we received a GRANDPA notification for
9391 best_grandpa_block_header : <B as Block >:: Header ,
9492 /// Best block a BEEFY voting round has been concluded for
@@ -143,14 +141,13 @@ where
143141 runtime,
144142 key_store,
145143 signed_commitment_sender,
146- gossip_engine : Arc :: new ( Mutex :: new ( gossip_engine ) ) ,
144+ gossip_engine,
147145 gossip_validator,
148146 // always target at least one block better than current best beefy
149147 min_block_delta : min_block_delta. max ( 1 ) ,
150148 metrics,
151149 rounds : None ,
152150 pending_votes : BTreeMap :: new ( ) ,
153- finality_notifications : client. finality_notification_stream ( ) ,
154151 best_grandpa_block_header : last_finalized_header,
155152 best_beefy_block : None ,
156153 last_signed_id : 0 ,
@@ -471,15 +468,21 @@ where
471468 true ,
472469 ) ;
473470
474- self . gossip_engine . lock ( ) . gossip_message ( topic :: < B > ( ) , encoded_message, false ) ;
471+ self . gossip_engine . gossip_message ( topic :: < B > ( ) , encoded_message, false ) ;
475472 }
476473
477474 /// Wait for BEEFY runtime pallet to be available.
478475 async fn wait_for_runtime_pallet ( & mut self ) {
479- self . client
480- . finality_notification_stream ( )
481- . take_while ( |notif| {
482- let at = BlockId :: hash ( notif. header . hash ( ) ) ;
476+ let mut gossip_engine = & mut self . gossip_engine ;
477+ let mut finality_stream = self . client . finality_notification_stream ( ) . fuse ( ) ;
478+ loop {
479+ futures:: select! {
480+ notif = finality_stream. next( ) => {
481+ let notif = match notif {
482+ Some ( notif) => notif,
483+ None => break
484+ } ;
485+ let at = BlockId :: hash( notif. header. hash( ) ) ;
483486 if let Some ( active) = self . runtime. runtime_api( ) . validator_set( & at) . ok( ) . flatten( ) {
484487 if active. id( ) == GENESIS_AUTHORITY_SET_ID {
485488 // When starting from genesis, there is no session boundary digest.
@@ -490,18 +493,18 @@ where
490493 // worker won't vote until it witnesses a session change.
491494 // Once we'll implement 'initial sync' (catch-up), the worker will be able to
492495 // start voting right away.
493- self . handle_finality_notification ( notif) ;
494- future :: ready ( false )
496+ self . handle_finality_notification( & notif) ;
497+ break
495498 } else {
496499 trace!( target: "beefy" , "🥩 Finality notification: {:?}" , notif) ;
497500 debug!( target: "beefy" , "🥩 Waiting for BEEFY pallet to become available..." ) ;
498- future:: ready ( true )
499501 }
500- } )
501- . for_each ( |_| future:: ready ( ( ) ) )
502- . await ;
503- // get a new stream that provides _new_ notifications (from here on out)
504- self . finality_notifications = self . client . finality_notification_stream ( ) ;
502+ } ,
503+ _ = gossip_engine => {
504+ break
505+ }
506+ }
507+ }
505508 }
506509
507510 /// Main loop for BEEFY worker.
@@ -512,35 +515,37 @@ where
512515 info ! ( target: "beefy" , "🥩 run BEEFY worker, best grandpa: #{:?}." , self . best_grandpa_block_header. number( ) ) ;
513516 self . wait_for_runtime_pallet ( ) . await ;
514517
515- let mut votes = Box :: pin ( self . gossip_engine . lock ( ) . messages_for ( topic :: < B > ( ) ) . filter_map (
516- |notification| async move {
517- trace ! ( target: "beefy" , "🥩 Got vote message: {:?}" , notification) ;
518-
519- VoteMessage :: < NumberFor < B > , AuthorityId , Signature > :: decode (
520- & mut & notification. message [ ..] ,
521- )
522- . ok ( )
523- } ,
524- ) ) ;
518+ let mut finality_notifications = self . client . finality_notification_stream ( ) . fuse ( ) ;
519+ let mut votes = Box :: pin (
520+ self . gossip_engine
521+ . messages_for ( topic :: < B > ( ) )
522+ . filter_map ( |notification| async move {
523+ trace ! ( target: "beefy" , "🥩 Got vote message: {:?}" , notification) ;
524+
525+ VoteMessage :: < NumberFor < B > , AuthorityId , Signature > :: decode (
526+ & mut & notification. message [ ..] ,
527+ )
528+ . ok ( )
529+ } )
530+ . fuse ( ) ,
531+ ) ;
525532
526533 loop {
527534 while self . sync_oracle . is_major_syncing ( ) {
528535 debug ! ( target: "beefy" , "Waiting for major sync to complete..." ) ;
529536 futures_timer:: Delay :: new ( Duration :: from_secs ( 5 ) ) . await ;
530537 }
531538
532- let engine = self . gossip_engine . clone ( ) ;
533- let gossip_engine = future:: poll_fn ( |cx| engine. lock ( ) . poll_unpin ( cx) ) ;
534-
539+ let mut gossip_engine = & mut self . gossip_engine ;
535540 futures:: select! {
536- notification = self . finality_notifications. next( ) . fuse ( ) => {
541+ notification = finality_notifications. next( ) => {
537542 if let Some ( notification) = notification {
538543 self . handle_finality_notification( & notification) ;
539544 } else {
540545 return ;
541546 }
542547 } ,
543- vote = votes. next( ) . fuse ( ) => {
548+ vote = votes. next( ) => {
544549 if let Some ( vote) = vote {
545550 let block_num = vote. commitment. block_number;
546551 if block_num > * self . best_grandpa_block_header. number( ) {
@@ -563,7 +568,7 @@ where
563568 return ;
564569 }
565570 } ,
566- _ = gossip_engine. fuse ( ) => {
571+ _ = gossip_engine => {
567572 error!( target: "beefy" , "🥩 Gossip engine has terminated." ) ;
568573 return ;
569574 }
0 commit comments