@@ -48,7 +48,7 @@ use crate::{
4848} ;
4949use crate :: environment:: HasVoted ;
5050use gossip:: {
51- GossipMessage , FullCatchUpMessage , FullCommitMessage , VoteOrPrecommitMessage , GossipValidator
51+ GossipMessage , FullCatchUpMessage , FullCommitMessage , VoteMessage , GossipValidator
5252} ;
5353use fg_primitives:: {
5454 AuthorityPair , AuthorityId , AuthoritySignature , SetId as SetIdNumber , RoundNumber ,
@@ -148,12 +148,21 @@ impl<B, S, H> Network<B> for Arc<NetworkService<B, S, H>> where
148148 type In = NetworkStream ;
149149
150150 fn messages_for ( & self , topic : B :: Hash ) -> Self :: In {
151+ // Given that one can only communicate with the Substrate network via the `NetworkService` via message-passing,
152+ // and given that methods on the network consensus gossip are not exposed but only reachable by passing a
153+ // closure into `with_gossip` on the `NetworkService` this function needs to make use of the `NetworkStream`
154+ // construction.
155+ //
156+ // We create a oneshot channel and pass the sender within a closure to the network. At some point in the future
157+ // the network passes the message channel back through the oneshot channel. But the consumer of this function
158+ // expects a stream, not a stream within a oneshot. This complexity is abstracted within `NetworkStream`,
159+ // waiting for the oneshot to resolve and from there on acting like a normal message channel.
151160 let ( tx, rx) = oneshot:: channel ( ) ;
152161 self . with_gossip ( move |gossip, _| {
153162 let inner_rx = gossip. messages_for ( GRANDPA_ENGINE_ID , topic) ;
154163 let _ = tx. send ( inner_rx) ;
155164 } ) ;
156- NetworkStream { outer : rx , inner : None }
165+ NetworkStream :: PollingOneshot ( rx )
157166 }
158167
159168 fn register_validator ( & self , validator : Arc < dyn network_gossip:: Validator < B > > ) {
@@ -202,28 +211,40 @@ impl<B, S, H> Network<B> for Arc<NetworkService<B, S, H>> where
202211 }
203212}
204213
205- /// A stream used by NetworkBridge in its implementation of Network.
206- pub struct NetworkStream {
207- inner : Option < mpsc:: UnboundedReceiver < network_gossip:: TopicNotification > > ,
208- outer : oneshot:: Receiver < mpsc:: UnboundedReceiver < network_gossip:: TopicNotification > >
214+ /// A stream used by NetworkBridge in its implementation of Network. Given a oneshot that eventually returns a channel
215+ /// which eventually returns messages, instead of:
216+ ///
217+ /// 1. polling the oneshot until it returns a message channel
218+ ///
219+ /// 2. polling the message channel for messages
220+ ///
221+ /// `NetworkStream` combines the two steps into one, requiring a consumer to only poll `NetworkStream` to retrieve
222+ /// messages directly.
223+ pub enum NetworkStream {
224+ PollingOneshot ( oneshot:: Receiver < mpsc:: UnboundedReceiver < network_gossip:: TopicNotification > > ) ,
225+ PollingTopicNotifications ( mpsc:: UnboundedReceiver < network_gossip:: TopicNotification > ) ,
209226}
210227
211228impl Stream for NetworkStream {
212229 type Item = network_gossip:: TopicNotification ;
213230 type Error = ( ) ;
214231
215232 fn poll ( & mut self ) -> Poll < Option < Self :: Item > , Self :: Error > {
216- if let Some ( ref mut inner) = self . inner {
217- return inner. poll ( ) ;
218- }
219- match self . outer . poll ( ) {
220- Ok ( futures:: Async :: Ready ( mut inner) ) => {
221- let poll_result = inner. poll ( ) ;
222- self . inner = Some ( inner) ;
223- poll_result
233+ match self {
234+ NetworkStream :: PollingOneshot ( oneshot) => {
235+ match oneshot. poll ( ) {
236+ Ok ( futures:: Async :: Ready ( mut stream) ) => {
237+ let poll_result = stream. poll ( ) ;
238+ * self = NetworkStream :: PollingTopicNotifications ( stream) ;
239+ poll_result
240+ } ,
241+ Ok ( futures:: Async :: NotReady ) => Ok ( futures:: Async :: NotReady ) ,
242+ Err ( _) => Err ( ( ) )
243+ }
244+ } ,
245+ NetworkStream :: PollingTopicNotifications ( stream) => {
246+ stream. poll ( )
224247 } ,
225- Ok ( futures:: Async :: NotReady ) => Ok ( futures:: Async :: NotReady ) ,
226- Err ( _) => Err ( ( ) )
227248 }
228249 }
229250}
@@ -275,8 +296,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
275296 validator. note_round ( Round ( round. number ) , |_, _| { } ) ;
276297
277298 for signed in round. votes . iter ( ) {
278- let message = gossip:: GossipMessage :: VoteOrPrecommit (
279- gossip:: VoteOrPrecommitMessage :: < B > {
299+ let message = gossip:: GossipMessage :: Vote (
300+ gossip:: VoteMessage :: < B > {
280301 message : signed. clone ( ) ,
281302 round : Round ( round. number ) ,
282303 set_id : SetId ( set_id) ,
@@ -341,7 +362,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
341362 ) ;
342363 }
343364
344- /// Get the round messages for a round in the current set ID. These are signature-checked.
365+ /// Get a stream of signature-checked round messages from the network as well as a sink for round messages to the
366+ /// network all within the current set.
345367 pub ( crate ) fn round_communication (
346368 & self ,
347369 round : Round ,
@@ -379,7 +401,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
379401 } )
380402 . and_then ( move |msg| {
381403 match msg {
382- GossipMessage :: VoteOrPrecommit ( msg) => {
404+ GossipMessage :: Vote ( msg) => {
383405 // check signature.
384406 if !voters. contains_key ( & msg. message . id ) {
385407 debug ! ( target: "afg" , "Skipping message from unknown voter {}" , msg. message. id) ;
@@ -707,7 +729,7 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
707729 id : local_id. clone ( ) ,
708730 } ;
709731
710- let message = GossipMessage :: VoteOrPrecommit ( VoteOrPrecommitMessage :: < Block > {
732+ let message = GossipMessage :: Vote ( VoteMessage :: < Block > {
711733 message : signed. clone ( ) ,
712734 round : Round ( self . round ) ,
713735 set_id : SetId ( self . set_id ) ,
0 commit comments