44
55use PhpAmqpLib \Channel \AbstractChannel ;
66use PhpAmqpLib \Exception \AMQPChannelException ;
7- use PhpAmqpLib \Wire \ AMQPWriter ;
7+ use PhpAmqpLib \Helper \ Protocol \ FrameBuilder ;
88
99class AMQPChannel extends AbstractChannel
1010{
@@ -40,6 +40,8 @@ public function __construct($connection,
4040 $ auto_decode =true )
4141 {
4242
43+ $ this ->frameBuilder = new FrameBuilder ();
44+
4345 if ($ channel_id == NULL )
4446 $ channel_id = $ connection ->get_free_channel_id ();
4547
@@ -100,11 +102,13 @@ public function close($reply_code=0,
100102 $ reply_text ="" ,
101103 $ method_sig =array (0 , 0 ))
102104 {
103- $ args = new AMQPWriter ();
104- $ args ->write_short ($ reply_code );
105- $ args ->write_shortstr ($ reply_text );
106- $ args ->write_short ($ method_sig [0 ]); // class_id
107- $ args ->write_short ($ method_sig [1 ]); // method_id
105+ $ args = $ this ->frameBuilder ->channelClose (
106+ $ reply_code ,
107+ $ reply_text ,
108+ $ method_sig [0 ],
109+ $ method_sig [1 ]
110+ );
111+
108112 $ this ->send_method_frame (array (20 , 40 ), $ args );
109113 return $ this ->wait (array (
110114 "20,41 " // Channel.close_ok
@@ -139,8 +143,7 @@ protected function close_ok($args)
139143 */
140144 public function flow ($ active )
141145 {
142- $ args = new AMQPWriter ();
143- $ args ->write_bit ($ active );
146+ $ args = $ this ->frameBuilder ->flow ($ active );
144147 $ this ->send_method_frame (array (20 , 20 ), $ args );
145148 return $ this ->wait (array (
146149 "20,21 " //Channel.flow_ok
@@ -155,8 +158,7 @@ protected function _flow($args)
155158
156159 protected function x_flow_ok ($ active )
157160 {
158- $ args = new AMQPWriter ();
159- $ args ->write_bit ($ active );
161+ $ args = $ this ->frameBuilder ->flow ($ active );
160162 $ this ->send_method_frame (array (20 , 21 ), $ args );
161163 }
162164
@@ -170,8 +172,7 @@ protected function x_open($out_of_band="")
170172 if ($ this ->is_open )
171173 return ;
172174
173- $ args = new AMQPWriter ();
174- $ args ->write_shortstr ($ out_of_band );
175+ $ args = $ this ->frameBuilder ->xOpen ($ out_of_band );
175176 $ this ->send_method_frame (array (20 , 10 ), $ args );
176177 return $ this ->wait (array (
177178 "20,11 " //Channel.open_ok
@@ -193,13 +194,10 @@ protected function open_ok($args)
193194 public function access_request ($ realm , $ exclusive =false ,
194195 $ passive =false , $ active =false , $ write =false , $ read =false )
195196 {
196- $ args = new AMQPWriter ();
197- $ args ->write_shortstr ($ realm );
198- $ args ->write_bit ($ exclusive );
199- $ args ->write_bit ($ passive );
200- $ args ->write_bit ($ active );
201- $ args ->write_bit ($ write );
202- $ args ->write_bit ($ read );
197+ $ args = $ this ->frameBuilder
198+ ->accessRequest ($ realm , $ exclusive ,
199+ $ passive , $ active ,
200+ $ write , $ read );
203201 $ this ->send_method_frame (array (30 , 10 ), $ args );
204202 return $ this ->wait (array (
205203 "30,11 " //Channel.access_request_ok
@@ -229,22 +227,17 @@ public function exchange_declare($exchange,
229227 $ arguments =NULL ,
230228 $ ticket =NULL )
231229 {
232- if ($ arguments ==NULL )
233- $ arguments = array ();
234230
235- $ args = new AMQPWriter ();
236- if ($ ticket != NULL )
237- $ args ->write_short ($ ticket );
238- else
239- $ args ->write_short ($ this ->default_ticket );
240- $ args ->write_shortstr ($ exchange );
241- $ args ->write_shortstr ($ type );
242- $ args ->write_bit ($ passive );
243- $ args ->write_bit ($ durable );
244- $ args ->write_bit ($ auto_delete );
245- $ args ->write_bit ($ internal );
246- $ args ->write_bit ($ nowait );
247- $ args ->write_table ($ arguments );
231+ $ arguments = $ this ->getArguments ($ arguments );
232+ $ ticket = $ this ->getTicket ($ ticket );
233+
234+ $ args = $ this ->frameBuilder ->exchangeDeclare (
235+ $ exchange , $ type , $ passive ,
236+ $ durable , $ auto_delete ,
237+ $ internal , $ nowait ,
238+ $ arguments , $ ticket
239+ );
240+
248241 $ this ->send_method_frame (array (40 , 10 ), $ args );
249242
250243 if (!$ nowait )
@@ -266,14 +259,8 @@ protected function exchange_declare_ok($args)
266259 public function exchange_delete ($ exchange , $ if_unused =false ,
267260 $ nowait =false , $ ticket =NULL )
268261 {
269- $ args = new AMQPWriter ();
270- if ($ ticket != NULL )
271- $ args ->write_short ($ ticket );
272- else
273- $ args ->write_short ($ this ->default_ticket );
274- $ args ->write_shortstr ($ exchange );
275- $ args ->write_bit ($ if_unused );
276- $ args ->write_bit ($ nowait );
262+ $ ticket = $ this ->getTicket ($ ticket );
263+ $ args = $ this ->frameBuilder ->exchangeDelete ($ exchange , $ if_unused , $ nowait , $ ticket );
277264 $ this ->send_method_frame (array (40 , 20 ), $ args );
278265
279266 if (!$ nowait )
@@ -296,19 +283,11 @@ protected function exchange_delete_ok($args)
296283 public function queue_bind ($ queue , $ exchange , $ routing_key ="" ,
297284 $ nowait =false , $ arguments =NULL , $ ticket =NULL )
298285 {
299- if ($ arguments == NULL )
300- $ arguments = array ();
286+ $ arguments = $ this ->getArguments ($ arguments );
287+ $ ticket = $ this ->getTicket ($ ticket );
288+
289+ $ args = $ this ->frameBuilder ->queueBind ($ queue , $ exchange , $ routing_key , $ nowait , $ arguments , $ ticket );
301290
302- $ args = new AMQPWriter ();
303- if ($ ticket != NULL )
304- $ args ->write_short ($ ticket );
305- else
306- $ args ->write_short ($ this ->default_ticket );
307- $ args ->write_shortstr ($ queue );
308- $ args ->write_shortstr ($ exchange );
309- $ args ->write_shortstr ($ routing_key );
310- $ args ->write_bit ($ nowait );
311- $ args ->write_table ($ arguments );
312291 $ this ->send_method_frame (array (50 , 20 ), $ args );
313292
314293 if (!$ nowait )
@@ -330,18 +309,11 @@ protected function queue_bind_ok($args)
330309 public function queue_unbind ($ queue , $ exchange , $ routing_key ="" ,
331310 $ arguments =NULL , $ ticket =NULL )
332311 {
333- if ($ arguments == NULL )
334- $ arguments = array ();
312+ $ arguments = $ this ->getArguments ($ arguments );
313+ $ ticket = $ this ->getTicket ($ ticket );
314+
315+ $ args = $ this ->frameBuilder ->queueUnbind ($ queue , $ exchange , $ routing_key , $ arguments , $ ticket );
335316
336- $ args = new AMQPWriter ();
337- if ($ ticket != NULL )
338- $ args ->write_short ($ ticket );
339- else
340- $ args ->write_short ($ this ->default_ticket );
341- $ args ->write_shortstr ($ queue );
342- $ args ->write_shortstr ($ exchange );
343- $ args ->write_shortstr ($ routing_key );
344- $ args ->write_table ($ arguments );
345317 $ this ->send_method_frame (array (50 , 50 ), $ args );
346318
347319 return $ this ->wait (array (
@@ -368,21 +340,13 @@ public function queue_declare($queue="",
368340 $ arguments =NULL ,
369341 $ ticket =NULL )
370342 {
371- if ( $ arguments == NULL )
372- $ arguments = array ( );
343+ $ arguments = $ this -> getArguments ( $ arguments );
344+ $ ticket = $ this -> getTicket ( $ ticket );
373345
374- $ args = new AMQPWriter ();
375- if ($ ticket != NULL )
376- $ args ->write_short ($ ticket );
377- else
378- $ args ->write_short ($ this ->default_ticket );
379- $ args ->write_shortstr ($ queue );
380- $ args ->write_bit ($ passive );
381- $ args ->write_bit ($ durable );
382- $ args ->write_bit ($ exclusive );
383- $ args ->write_bit ($ auto_delete );
384- $ args ->write_bit ($ nowait );
385- $ args ->write_table ($ arguments );
346+ $ args = $ this ->frameBuilder ->queueDeclare (
347+ $ queue , $ passive , $ durable ,
348+ $ exclusive , $ auto_delete , $ nowait ,
349+ $ arguments , $ ticket );
386350 $ this ->send_method_frame (array (50 , 10 ), $ args );
387351
388352 if (!$ nowait )
@@ -409,16 +373,10 @@ protected function queue_declare_ok($args)
409373 public function queue_delete ($ queue ="" , $ if_unused =false , $ if_empty =false ,
410374 $ nowait =false , $ ticket =NULL )
411375 {
412- $ args = new AMQPWriter ();
413- if ($ ticket != NULL )
414- $ args ->write_short ($ ticket );
415- else
416- $ args ->write_short ($ this ->default_ticket );
376+ $ ticket = $ this ->getTicket ($ ticket );
377+
378+ $ args = $ this ->frameBuilder ->queueDelete ($ queue , $ if_unused , $ if_empty , $ nowait , $ ticket );
417379
418- $ args ->write_shortstr ($ queue );
419- $ args ->write_bit ($ if_unused );
420- $ args ->write_bit ($ if_empty );
421- $ args ->write_bit ($ nowait );
422380 $ this ->send_method_frame (array (50 , 40 ), $ args );
423381
424382 if (!$ nowait )
@@ -440,13 +398,9 @@ protected function queue_delete_ok($args)
440398 */
441399 public function queue_purge ($ queue ="" , $ nowait =false , $ ticket =NULL )
442400 {
443- $ args = new AMQPWriter ();
444- if ($ ticket != NULL )
445- $ args ->write_short ($ ticket );
446- else
447- $ args ->write_short ($ this ->default_ticket );
448- $ args ->write_shortstr ($ queue );
449- $ args ->write_bit ($ nowait );
401+ $ ticket = $ this ->getTicket ($ ticket );
402+ $ args = $ this ->frameBuilder ->queuePurge ($ queue , $ nowait , $ ticket );
403+
450404 $ this ->send_method_frame (array (50 , 30 ), $ args );
451405
452406 if (!$ nowait )
@@ -468,9 +422,7 @@ protected function queue_purge_ok($args)
468422 */
469423 public function basic_ack ($ delivery_tag , $ multiple =false )
470424 {
471- $ args = new AMQPWriter ();
472- $ args ->write_longlong ($ delivery_tag );
473- $ args ->write_bit ($ multiple );
425+ $ args = $ this ->frameBuilder ->basicAck ($ delivery_tag , $ multiple );
474426 $ this ->send_method_frame (array (60 , 80 ), $ args );
475427 }
476428
@@ -479,9 +431,7 @@ public function basic_ack($delivery_tag, $multiple=false)
479431 */
480432 public function basic_cancel ($ consumer_tag , $ nowait =false )
481433 {
482- $ args = new AMQPWriter ();
483- $ args ->write_shortstr ($ consumer_tag );
484- $ args ->write_bit ($ nowait );
434+ $ args = $ this ->frameBuilder ->basicCancel ($ consumer_tag , $ nowait );
485435 $ this ->send_method_frame (array (60 , 30 ), $ args );
486436 return $ this ->wait (array (
487437 "60,31 " // Channel.basic_cancel_ok
@@ -504,17 +454,11 @@ public function basic_consume($queue="", $consumer_tag="", $no_local=false,
504454 $ no_ack =false , $ exclusive =false , $ nowait =false ,
505455 $ callback =NULL , $ ticket =NULL )
506456 {
507- $ args = new AMQPWriter ();
508- if ($ ticket != NULL )
509- $ args ->write_short ($ ticket );
510- else
511- $ args ->write_short ($ this ->default_ticket );
512- $ args ->write_shortstr ($ queue );
513- $ args ->write_shortstr ($ consumer_tag );
514- $ args ->write_bit ($ no_local );
515- $ args ->write_bit ($ no_ack );
516- $ args ->write_bit ($ exclusive );
517- $ args ->write_bit ($ nowait );
457+ $ ticket = $ this ->getTicket ($ ticket );
458+ $ args = $ this ->frameBuilder ->basicConsume (
459+ $ queue , $ consumer_tag , $ no_local ,
460+ $ no_ack , $ exclusive , $ nowait , $ ticket );
461+
518462 $ this ->send_method_frame (array (60 , 20 ), $ args );
519463
520464 if (!$ nowait )
@@ -574,13 +518,9 @@ protected function basic_deliver($args, $msg)
574518 */
575519 public function basic_get ($ queue ="" , $ no_ack =false , $ ticket =NULL )
576520 {
577- $ args = new AMQPWriter ();
578- if ($ ticket != NULL )
579- $ args ->write_short ($ ticket );
580- else
581- $ args ->write_short ($ this ->default_ticket );
582- $ args ->write_shortstr ($ queue );
583- $ args ->write_bit ($ no_ack );
521+ $ ticket = $ this ->getTicket ($ ticket );
522+ $ args = $ this ->frameBuilder ->basicGet ($ queue , $ no_ack , $ ticket );
523+
584524 $ this ->send_method_frame (array (60 , 70 ), $ args );
585525 return $ this ->wait (array (
586526 "60,71 " , //Channel.basic_get_ok
@@ -624,15 +564,11 @@ public function basic_publish($msg, $exchange="", $routing_key="",
624564 $ mandatory =false , $ immediate =false ,
625565 $ ticket =NULL )
626566 {
627- $ args = new AMQPWriter ();
628- if ($ ticket != NULL )
629- $ args ->write_short ($ ticket );
630- else
631- $ args ->write_short ($ this ->default_ticket );
632- $ args ->write_shortstr ($ exchange );
633- $ args ->write_shortstr ($ routing_key );
634- $ args ->write_bit ($ mandatory );
635- $ args ->write_bit ($ immediate );
567+ $ ticket = $ this ->getTicket ($ ticket );
568+ $ args = $ this ->frameBuilder ->basicPublish (
569+ $ exchange , $ routing_key , $ mandatory ,
570+ $ immediate , $ ticket );
571+
636572 $ this ->send_method_frame (array (60 , 40 ), $ args );
637573
638574 $ this ->connection ->send_content ($ this ->channel_id , 60 , 0 ,
@@ -647,10 +583,7 @@ public function basic_publish($msg, $exchange="", $routing_key="",
647583 */
648584 public function basic_qos ($ prefetch_size , $ prefetch_count , $ a_global )
649585 {
650- $ args = new AMQPWriter ();
651- $ args ->write_long ($ prefetch_size );
652- $ args ->write_short ($ prefetch_count );
653- $ args ->write_bit ($ a_global );
586+ $ args = $ this ->frameBuilder ->basicQos ($ prefetch_size , $ prefetch_count , $ a_global );
654587 $ this ->send_method_frame (array (60 , 10 ), $ args );
655588 return $ this ->wait (array (
656589 "60,11 " //Channel.basic_qos_ok
@@ -670,8 +603,7 @@ protected function basic_qos_ok($args)
670603 */
671604 public function basic_recover ($ requeue =false )
672605 {
673- $ args = new AMQPWriter ();
674- $ args ->write_bit ($ requeue );
606+ $ args = $ this ->frameBuilder ->basicRecover ($ requeue );
675607 $ this ->send_method_frame (array (60 , 100 ), $ args );
676608 }
677609
@@ -680,9 +612,7 @@ public function basic_recover($requeue=false)
680612 */
681613 public function basic_reject ($ delivery_tag , $ requeue )
682614 {
683- $ args = new AMQPWriter ();
684- $ args ->write_longlong ($ delivery_tag );
685- $ args ->write_bit ($ requeue );
615+ $ args = $ this ->frameBuilder ->basicRecover ($ delivery_tag , $ requeue );
686616 $ this ->send_method_frame (array (60 , 90 ), $ args );
687617 }
688618
@@ -751,4 +681,14 @@ protected function tx_select_ok($args)
751681 {
752682 }
753683
684+ protected function getArguments ($ arguments )
685+ {
686+ return (null === $ arguments ) ? array () : $ arguments ;
687+ }
688+
689+ protected function getTicket ($ ticket )
690+ {
691+ return (null === $ ticket ) ? $ this ->default_ticket : $ ticket ;
692+ }
693+
754694}
0 commit comments