@@ -35,13 +35,17 @@ enum Command<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
3535pub struct Service < N : RawNetwork , D : Data > {
3636 network : N ,
3737 messages_from_user : mpsc:: UnboundedReceiver < Command < D , N :: PeerId > > ,
38- messages_for_user : mpsc:: UnboundedSender < ( D , N :: PeerId ) > ,
38+ messages_for_authentication_user : mpsc:: UnboundedSender < ( D , N :: PeerId ) > ,
39+ messages_for_block_sync_user : mpsc:: UnboundedSender < ( D , N :: PeerId ) > ,
3940 authentication_connected_peers : HashSet < N :: PeerId > ,
4041 authentication_peer_senders : HashMap < N :: PeerId , TracingUnboundedSender < D > > ,
42+ block_sync_connected_peers : HashSet < N :: PeerId > ,
43+ block_sync_peer_senders : HashMap < N :: PeerId , TracingUnboundedSender < D > > ,
4144 spawn_handle : SpawnTaskHandle ,
4245}
4346
4447struct ServiceInterface < D : Data , P : Clone + Debug + Eq + Hash + Send + ' static > {
48+ protocol : Protocol ,
4549 messages_from_service : mpsc:: UnboundedReceiver < ( D , P ) > ,
4650 messages_for_service : mpsc:: UnboundedSender < Command < D , P > > ,
4751}
@@ -70,7 +74,7 @@ impl<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> Network<D> for Serv
7074
7175 fn send_to ( & mut self , data : D , peer_id : Self :: PeerId ) -> Result < ( ) , Self :: Error > {
7276 self . messages_for_service
73- . unbounded_send ( Command :: Send ( data, peer_id, Protocol :: Authentication ) )
77+ . unbounded_send ( Command :: Send ( data, peer_id, self . protocol ) )
7478 . map_err ( |_| Error :: ServiceStopped )
7579 }
7680
@@ -80,17 +84,13 @@ impl<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> Network<D> for Serv
8084 peer_ids : HashSet < Self :: PeerId > ,
8185 ) -> Result < ( ) , Self :: Error > {
8286 self . messages_for_service
83- . unbounded_send ( Command :: SendToRandom (
84- data,
85- peer_ids,
86- Protocol :: Authentication ,
87- ) )
87+ . unbounded_send ( Command :: SendToRandom ( data, peer_ids, self . protocol ) )
8888 . map_err ( |_| Error :: ServiceStopped )
8989 }
9090
9191 fn broadcast ( & mut self , data : D ) -> Result < ( ) , Self :: Error > {
9292 self . messages_for_service
93- . unbounded_send ( Command :: Broadcast ( data, Protocol :: Authentication ) )
93+ . unbounded_send ( Command :: Broadcast ( data, self . protocol ) )
9494 . map_err ( |_| Error :: ServiceStopped )
9595 }
9696
@@ -115,20 +115,32 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
115115 ) -> (
116116 Service < N , D > ,
117117 impl Network < D , Error = Error , PeerId = N :: PeerId > ,
118+ impl Network < D , Error = Error , PeerId = N :: PeerId > ,
118119 ) {
119- let ( messages_for_user, messages_from_service) = mpsc:: unbounded ( ) ;
120+ let ( messages_for_authentication_user, messages_from_authentication_service) =
121+ mpsc:: unbounded ( ) ;
122+ let ( messages_for_block_sync_user, messages_from_block_sync_service) = mpsc:: unbounded ( ) ;
120123 let ( messages_for_service, messages_from_user) = mpsc:: unbounded ( ) ;
121124 (
122125 Service {
123126 network,
124127 messages_from_user,
125- messages_for_user,
128+ messages_for_authentication_user,
129+ messages_for_block_sync_user,
126130 spawn_handle,
127131 authentication_connected_peers : HashSet :: new ( ) ,
128132 authentication_peer_senders : HashMap :: new ( ) ,
133+ block_sync_connected_peers : HashSet :: new ( ) ,
134+ block_sync_peer_senders : HashMap :: new ( ) ,
135+ } ,
136+ ServiceInterface {
137+ protocol : Protocol :: Authentication ,
138+ messages_from_service : messages_from_authentication_service,
139+ messages_for_service : messages_for_service. clone ( ) ,
129140 } ,
130141 ServiceInterface {
131- messages_from_service,
142+ protocol : Protocol :: BlockSync ,
143+ messages_from_service : messages_from_block_sync_service,
132144 messages_for_service,
133145 } ,
134146 )
@@ -141,6 +153,7 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
141153 ) -> Option < & mut TracingUnboundedSender < D > > {
142154 match protocol {
143155 Protocol :: Authentication => self . authentication_peer_senders . get_mut ( peer) ,
156+ Protocol :: BlockSync => self . block_sync_peer_senders . get_mut ( peer) ,
144157 }
145158 }
146159
@@ -211,6 +224,7 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
211224 fn protocol_peers ( & self , protocol : Protocol ) -> & HashSet < N :: PeerId > {
212225 match protocol {
213226 Protocol :: Authentication => & self . authentication_connected_peers ,
227+ Protocol :: BlockSync => & self . block_sync_connected_peers ,
214228 }
215229 }
216230
@@ -262,6 +276,12 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
262276 self . authentication_peer_senders . insert ( peer. clone ( ) , tx) ;
263277 rx
264278 }
279+ Protocol :: BlockSync => {
280+ let ( tx, rx) = tracing_unbounded ( "mpsc_notification_stream_block_sync" ) ;
281+ self . block_sync_connected_peers . insert ( peer. clone ( ) ) ;
282+ self . block_sync_peer_senders . insert ( peer. clone ( ) , tx) ;
283+ rx
284+ }
265285 } ;
266286 self . spawn_handle . spawn (
267287 "aleph/network/peer_sender" ,
@@ -276,19 +296,33 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
276296 self . authentication_connected_peers . remove ( & peer) ;
277297 self . authentication_peer_senders . remove ( & peer) ;
278298 }
299+ Protocol :: BlockSync => {
300+ self . block_sync_connected_peers . remove ( & peer) ;
301+ self . block_sync_peer_senders . remove ( & peer) ;
302+ }
279303 }
280304 }
281305 Messages ( peer_id, messages) => {
282306 for ( protocol, data) in messages. into_iter ( ) {
283307 match protocol {
284308 Protocol :: Authentication => match D :: decode ( & mut & data[ ..] ) {
285309 Ok ( data) => self
286- . messages_for_user
310+ . messages_for_authentication_user
287311 . unbounded_send ( ( data, peer_id. clone ( ) ) ) ?,
288312 Err ( e) => {
289313 warn ! ( target: "aleph-network" , "Error decoding authentication protocol message: {}" , e)
290314 }
291315 } ,
316+ // This is a bit of a placeholder for now, as we are not yet using this
317+ // protocol. In the future we will not be using the same D as above.
318+ Protocol :: BlockSync => match D :: decode ( & mut & data[ ..] ) {
319+ Ok ( data) => self
320+ . messages_for_block_sync_user
321+ . unbounded_send ( ( data, peer_id. clone ( ) ) ) ?,
322+ Err ( e) => {
323+ warn ! ( target: "aleph-network" , "Error decoding block sync protocol message: {}" , e)
324+ }
325+ } ,
292326 } ;
293327 }
294328 }
@@ -303,6 +337,10 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
303337 "authentication connected peers - {:?}; " ,
304338 self . authentication_connected_peers. len( )
305339 ) ) ;
340+ status. push_str ( & format ! (
341+ "block sync connected peers - {:?}; " ,
342+ self . block_sync_connected_peers. len( )
343+ ) ) ;
306344
307345 info ! ( target: "aleph-network" , "{}" , status) ;
308346 }
@@ -379,7 +417,7 @@ mod tests {
379417
380418 // Prepare service
381419 let network = MockRawNetwork :: new ( event_stream_oneshot_tx) ;
382- let ( service, gossip_network) =
420+ let ( service, gossip_network, _ ) =
383421 Service :: new ( network. clone ( ) , task_manager. spawn_handle ( ) ) ;
384422 let gossip_network = Box :: new ( gossip_network) ;
385423
0 commit comments