@@ -160,105 +160,23 @@ impl PubsubConnectionInner {
160160 } ;
161161
162162 match message_type. as_slice ( ) {
163- b"subscribe" => match self . pending_subs . remove ( & topic) {
164- Some ( ( sender, signal) ) => {
165- self . subscriptions . insert ( topic, sender) ;
166- signal
167- . send ( ( ) )
168- . map_err ( |( ) | error:: internal ( "Error confirming subscription" ) ) ?
169- }
170- None => {
171- return Err ( error:: internal ( format ! (
172- "Received unexpected subscribe notification for topic: {}" ,
173- topic
174- ) ) ) ;
175- }
176- } ,
177- b"psubscribe" => match self . pending_psubs . remove ( & topic) {
178- Some ( ( sender, signal) ) => {
179- self . psubscriptions . insert ( topic, sender) ;
180- signal
181- . send ( ( ) )
182- . map_err ( |( ) | error:: internal ( "Error confirming subscription" ) ) ?
183- }
184- None => {
185- return Err ( error:: internal ( format ! (
186- "Received unexpected subscribe notification for topic: {}" ,
187- topic
188- ) ) ) ;
189- }
190- } ,
191- b"unsubscribe" => {
192- match self . subscriptions . entry ( topic) {
193- Entry :: Occupied ( entry) => {
194- entry. remove_entry ( ) ;
195- }
196- Entry :: Vacant ( vacant) => {
197- return Err ( error:: internal ( format ! (
198- "Unexpected unsubscribe message: {}" ,
199- vacant. key( )
200- ) ) ) ;
201- }
202- }
203- if self . subscriptions . is_empty ( ) {
204- return Ok ( false ) ;
205- }
163+ b"subscribe" => {
164+ process_subscribe ( & mut self . pending_subs , & mut self . subscriptions , topic)
206165 }
207- b"punsubscribe" => {
208- match self . psubscriptions . entry ( topic) {
209- Entry :: Occupied ( entry) => {
210- entry. remove_entry ( ) ;
211- }
212- Entry :: Vacant ( vacant) => {
213- return Err ( error:: internal ( format ! (
214- "Unexpected unsubscribe message: {}" ,
215- vacant. key( )
216- ) ) ) ;
217- }
218- }
219- if self . psubscriptions . is_empty ( ) {
220- return Ok ( false ) ;
221- }
166+ b"psubscribe" => {
167+ process_subscribe ( & mut self . pending_psubs , & mut self . psubscriptions , topic)
222168 }
223- b"message" => match self . subscriptions . get ( & topic) {
224- Some ( sender) => {
225- if let Err ( error) = sender. unbounded_send ( Ok ( msg) ) {
226- if !error. is_disconnected ( ) {
227- return Err ( error:: internal ( format ! ( "Cannot send message: {}" , error) ) ) ;
228- }
229- }
230- }
231- None => {
232- return Err ( error:: internal ( format ! (
233- "Unexpected message on topic: {}" ,
234- topic
235- ) ) ) ;
236- }
237- } ,
238- b"pmessage" => match self . psubscriptions . get ( & topic) {
239- Some ( sender) => {
240- if let Err ( error) = sender. unbounded_send ( Ok ( msg) ) {
241- if !error. is_disconnected ( ) {
242- return Err ( error:: internal ( format ! ( "Cannot send message: {}" , error) ) ) ;
243- }
244- }
245- }
246- None => {
247- return Err ( error:: internal ( format ! (
248- "Unexpected message on topic: {}" ,
249- topic
250- ) ) ) ;
251- }
252- } ,
169+ b"unsubscribe" => process_unsubscribe ( & mut self . subscriptions , topic) ,
170+ b"punsubscribe" => process_unsubscribe ( & mut self . psubscriptions , topic) ,
171+ b"message" => process_message ( & self . subscriptions , & topic, msg) ,
172+ b"pmessage" => process_message ( & self . psubscriptions , & topic, msg) ,
253173 t => {
254174 return Err ( error:: internal ( format ! (
255175 "Unexpected data on Pub/Sub connection: {}" ,
256176 String :: from_utf8_lossy( t)
257177 ) ) ) ;
258178 }
259179 }
260-
261- Ok ( true )
262180 }
263181
264182 /// Returns true, if there are still valid subscriptions at the end, or false if not, i.e. the whole thing can be dropped.
@@ -314,6 +232,72 @@ impl PubsubConnectionInner {
314232 }
315233}
316234
235+ fn process_subscribe (
236+ pending_subs : & mut BTreeMap < String , ( PubsubSink , oneshot:: Sender < ( ) > ) > ,
237+ subscriptions : & mut BTreeMap < String , PubsubSink > ,
238+ topic : String ,
239+ ) -> Result < bool , error:: Error > {
240+ match pending_subs. remove ( & topic) {
241+ Some ( ( sender, signal) ) => {
242+ subscriptions. insert ( topic, sender) ;
243+ signal
244+ . send ( ( ) )
245+ . map_err ( |( ) | error:: internal ( "Error confirming subscription" ) ) ?
246+ }
247+ None => {
248+ return Err ( error:: internal ( format ! (
249+ "Received unexpected subscribe notification for topic: {}" ,
250+ topic
251+ ) ) ) ;
252+ }
253+ } ;
254+ Ok ( true )
255+ }
256+
257+ fn process_unsubscribe (
258+ subscriptions : & mut BTreeMap < String , PubsubSink > ,
259+ topic : String ,
260+ ) -> Result < bool , error:: Error > {
261+ match subscriptions. entry ( topic) {
262+ Entry :: Occupied ( entry) => {
263+ entry. remove_entry ( ) ;
264+ }
265+ Entry :: Vacant ( vacant) => {
266+ return Err ( error:: internal ( format ! (
267+ "Unexpected unsubscribe message: {}" ,
268+ vacant. key( )
269+ ) ) ) ;
270+ }
271+ }
272+ if subscriptions. is_empty ( ) {
273+ return Ok ( false ) ;
274+ }
275+ Ok ( true )
276+ }
277+
278+ fn process_message (
279+ subscriptions : & BTreeMap < String , PubsubSink > ,
280+ topic : & str ,
281+ msg : resp:: RespValue ,
282+ ) -> Result < bool , error:: Error > {
283+ match subscriptions. get ( topic) {
284+ Some ( sender) => {
285+ if let Err ( error) = sender. unbounded_send ( Ok ( msg) ) {
286+ if !error. is_disconnected ( ) {
287+ return Err ( error:: internal ( format ! ( "Cannot send message: {}" , error) ) ) ;
288+ }
289+ }
290+ }
291+ None => {
292+ return Err ( error:: internal ( format ! (
293+ "Unexpected message on topic: {}" ,
294+ topic
295+ ) ) ) ;
296+ }
297+ } ;
298+ Ok ( true )
299+ }
300+
317301impl Future for PubsubConnectionInner {
318302 type Output = Result < ( ) , error:: Error > ;
319303
0 commit comments