@@ -251,6 +251,7 @@ macro_rules! test (
251251 #[ allow( unused_imports) ] ;
252252
253253 use native;
254+ use comm:: * ;
254255 use prelude:: * ;
255256 use super :: * ;
256257 use super :: super :: * ;
@@ -323,6 +324,20 @@ pub struct SharedChan<T> {
323324 priv queue : mpsc:: Producer < T , Packet > ,
324325}
325326
327+ /// This enumeration is the list of the possible reasons that try_recv could not
328+ /// return data when called.
329+ #[ deriving( Eq , Clone ) ]
330+ pub enum TryRecvResult < T > {
331+ /// This channel is currently empty, but the sender(s) have not yet
332+ /// disconnected, so data may yet become available.
333+ Empty ,
334+ /// This channel's sending half has become disconnected, and there will
335+ /// never be any more data received on this channel
336+ Disconnected ,
337+ /// The channel had some data and we successfully popped it
338+ Data ( T ) ,
339+ }
340+
326341///////////////////////////////////////////////////////////////////////////////
327342// Internal struct definitions
328343///////////////////////////////////////////////////////////////////////////////
@@ -739,11 +754,11 @@ impl<T: Send> Port<T> {
739754 /// block on a port.
740755 ///
741756 /// This function cannot fail.
742- pub fn try_recv ( & self ) -> Option < T > {
757+ pub fn try_recv ( & self ) -> TryRecvResult < T > {
743758 self . try_recv_inc ( true )
744759 }
745760
746- fn try_recv_inc ( & self , increment : bool ) -> Option < T > {
761+ fn try_recv_inc ( & self , increment : bool ) -> TryRecvResult < T > {
747762 // This is a "best effort" situation, so if a queue is inconsistent just
748763 // don't worry about it.
749764 let this = unsafe { cast:: transmute_mut ( self ) } ;
@@ -807,7 +822,35 @@ impl<T: Send> Port<T> {
807822 if increment && ret. is_some ( ) {
808823 unsafe { ( * this. queue . packet ( ) ) . steals += 1 ; }
809824 }
810- return ret;
825+ match ret {
826+ Some ( t) => Data ( t) ,
827+ None => {
828+ // It's possible that between the time that we saw the queue was
829+ // empty and here the other side disconnected. It's also
830+ // possible for us to see the disconnection here while there is
831+ // data in the queue. It's pretty backwards-thinking to return
832+ // Disconnected when there's actually data on the queue, so if
833+ // we see a disconnected state be sure to check again to be 100%
834+ // sure that there's no data in the queue.
835+ let cnt = unsafe { ( * this. queue . packet ( ) ) . cnt . load ( Relaxed ) } ;
836+ if cnt != DISCONNECTED { return Empty }
837+
838+ let ret = match this. queue {
839+ SPSC ( ref mut queue) => queue. pop ( ) ,
840+ MPSC ( ref mut queue) => match queue. pop ( ) {
841+ mpsc:: Data ( t) => Some ( t) ,
842+ mpsc:: Empty => None ,
843+ mpsc:: Inconsistent => {
844+ fail ! ( "inconsistent with no senders?!" ) ;
845+ }
846+ }
847+ } ;
848+ match ret {
849+ Some ( data) => Data ( data) ,
850+ None => Disconnected ,
851+ }
852+ }
853+ }
811854 }
812855
813856 /// Attempt to wait for a value on this port, but does not fail if the
@@ -824,7 +867,11 @@ impl<T: Send> Port<T> {
824867 /// the value found on the port is returned.
825868 pub fn recv_opt ( & self ) -> Option < T > {
826869 // optimistic preflight check (scheduling is expensive)
827- match self . try_recv ( ) { None => { } , data => return data }
870+ match self . try_recv ( ) {
871+ Empty => { } ,
872+ Disconnected => return None ,
873+ Data ( t) => return Some ( t) ,
874+ }
828875
829876 let packet;
830877 let this;
@@ -843,12 +890,11 @@ impl<T: Send> Port<T> {
843890 } ) ;
844891 }
845892
846- let data = self . try_recv_inc ( false ) ;
847- if data . is_none ( ) &&
848- unsafe { ( * packet ) . cnt . load ( SeqCst ) } != DISCONNECTED {
849- fail ! ( "bug: woke up too soon {}" , unsafe { ( * packet ) . cnt . load ( SeqCst ) } ) ;
893+ match self . try_recv_inc ( false ) {
894+ Data ( t ) => Some ( t ) ,
895+ Empty => fail ! ( "bug: woke up too soon" ) ,
896+ Disconnected => None ,
850897 }
851- return data;
852898 }
853899
854900 /// Returns an iterator which will block waiting for messages, but never
@@ -1005,7 +1051,10 @@ mod test {
10051051 for _ in range( 0 , AMT * NTHREADS ) {
10061052 assert_eq!( p. recv( ) , 1 ) ;
10071053 }
1008- assert_eq!( p. try_recv( ) , None ) ;
1054+ match p. try_recv( ) {
1055+ Data ( ..) => fail!( ) ,
1056+ _ => { }
1057+ }
10091058 c1. send( ( ) ) ;
10101059 }
10111060
@@ -1129,7 +1178,7 @@ mod test {
11291178 test ! ( fn oneshot_single_thread_try_recv_open( ) {
11301179 let ( port, chan) = Chan :: <int>:: new( ) ;
11311180 chan. send( 10 ) ;
1132- assert!( port. try_recv ( ) == Some ( 10 ) ) ;
1181+ assert!( port. recv_opt ( ) == Some ( 10 ) ) ;
11331182 } )
11341183
11351184 test ! ( fn oneshot_single_thread_try_recv_closed( ) {
@@ -1140,21 +1189,21 @@ mod test {
11401189
11411190 test ! ( fn oneshot_single_thread_peek_data( ) {
11421191 let ( port, chan) = Chan :: <int>:: new( ) ;
1143- assert !( port. try_recv( ) . is_none ( ) ) ;
1192+ assert_eq !( port. try_recv( ) , Empty )
11441193 chan. send( 10 ) ;
1145- assert !( port. try_recv( ) . is_some ( ) ) ;
1194+ assert_eq !( port. try_recv( ) , Data ( 10 ) ) ;
11461195 } )
11471196
11481197 test ! ( fn oneshot_single_thread_peek_close( ) {
11491198 let ( port, chan) = Chan :: <int>:: new( ) ;
11501199 { let _c = chan; }
1151- assert !( port. try_recv( ) . is_none ( ) ) ;
1152- assert !( port. try_recv( ) . is_none ( ) ) ;
1200+ assert_eq !( port. try_recv( ) , Disconnected ) ;
1201+ assert_eq !( port. try_recv( ) , Disconnected ) ;
11531202 } )
11541203
11551204 test ! ( fn oneshot_single_thread_peek_open( ) {
11561205 let ( port, _) = Chan :: <int>:: new( ) ;
1157- assert !( port. try_recv( ) . is_none ( ) ) ;
1206+ assert_eq !( port. try_recv( ) , Empty ) ;
11581207 } )
11591208
11601209 test ! ( fn oneshot_multi_task_recv_then_send( ) {
@@ -1321,4 +1370,27 @@ mod test {
13211370 drop( chan) ;
13221371 assert_eq!( count_port. recv( ) , 4 ) ;
13231372 } )
1373+
1374+ test ! ( fn try_recv_states( ) {
1375+ let ( p, c) = Chan :: <int>:: new( ) ;
1376+ let ( p1, c1) = Chan :: <( ) >:: new( ) ;
1377+ let ( p2, c2) = Chan :: <( ) >:: new( ) ;
1378+ do spawn {
1379+ p1. recv( ) ;
1380+ c. send( 1 ) ;
1381+ c2. send( ( ) ) ;
1382+ p1. recv( ) ;
1383+ drop( c) ;
1384+ c2. send( ( ) ) ;
1385+ }
1386+
1387+ assert_eq!( p. try_recv( ) , Empty ) ;
1388+ c1. send( ( ) ) ;
1389+ p2. recv( ) ;
1390+ assert_eq!( p. try_recv( ) , Data ( 1 ) ) ;
1391+ assert_eq!( p. try_recv( ) , Empty ) ;
1392+ c1. send( ( ) ) ;
1393+ p2. recv( ) ;
1394+ assert_eq!( p. try_recv( ) , Disconnected ) ;
1395+ } )
13241396}
0 commit comments