Skip to content

Commit b18ce04

Browse files
committed
Merge remote-tracking branch 'origin/master' into async-events
2 parents a24a972 + cf9babd commit b18ce04

File tree

18 files changed

+604
-5
lines changed

18 files changed

+604
-5
lines changed

README.md

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ This is where all development happens. The repository provides a friendly enviro
88
Features:
99

1010
* [JMS](https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html) like transport [abstraction](https://github.com/php-enqueue/psr-queue).
11-
* Feature rich.
11+
* [Feature rich](docs/quick_tour.md).
1212
* Supports transports:
1313
- [AMQP](docs/transport/amqp.md) (RabbitMQ, ActiveMQ and others),
1414
- [STOMP](docs/transport/stomp.md)
@@ -18,7 +18,56 @@ Features:
1818
- [Filesystem](docs/transport/filesystem.md)
1919
- [Null](docs/transport/null.md).
2020
* Generic purpose abstraction level (the transport level).
21+
22+
```php
23+
<?php
24+
use function Enqueue\dsn_to_context;
25+
use function Enqueue\send_queue;
26+
use function Enqueue\consume;
27+
use Enqueue\Psr\PsrMessage;
28+
use Enqueue\Consumption\Result;
29+
30+
// composer require enqueue/enqueue enqueue/amqp-ext
31+
32+
$c = dsn_to_context('amqp:://');
33+
34+
send_queue($c, 'a_queue', 'Hello there');
35+
36+
consume($c, 'a_queue', function(PsrMessage $message) {
37+
$body = $message->getBody();
38+
39+
// to stop consumption: throw new \Enqueue\Consumption\Exception\ConsumptionInterruptedException;
40+
41+
return Result::ACK;
42+
});
43+
```
44+
2145
* Easy to use abstraction level (the client level).
46+
47+
```php
48+
<?php
49+
use Enqueue\SimpleClient\SimpleClient;
50+
use Enqueue\Psr\PsrMessage;
51+
use Enqueue\Consumption\Result;
52+
53+
// composer require enqueue/simple-client enqueue/fs
54+
55+
$client = new SimpleClient('file://');
56+
$client->bind('a_topic', 'a_processor', function(PsrMessage $message) {
57+
$body = $message->getBody();
58+
59+
// to stop consumption: throw new \Enqueue\Consumption\Exception\ConsumptionInterruptedException;
60+
61+
return Result::ACK;
62+
});
63+
64+
$client->setupBroker();
65+
66+
$client->send('a_topic', 'Hello there');
67+
68+
$client->consume();
69+
```
70+
2271
* [Symfony bundle](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/quick_tour.md)
2372
* [Magento1 extension](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/magento/quick_tour.md)
2473
* [Message bus](http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html) support.
@@ -30,6 +79,7 @@ Features:
3079

3180
## Resources
3281

82+
* [Quick tour](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/quick_tour.md)
3383
* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)
3484
* [Questions](https://gitter.im/php-enqueue/Lobby)
3585
* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)

composer.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
"symfony/monolog-bundle": "^2.8|^3",
2525
"symfony/browser-kit": "^2.8|^3",
2626
"symfony/expression-language": "^2.8|^3",
27+
"symfony/event-dispatcher": "^2.8|^3",
28+
"symfony/console": "^2.8|^3",
2729
"friendsofphp/php-cs-fixer": "^2",
2830
"empi89/php-amqp-stubs": "*@dev",
2931
"phpstan/phpstan": "^0.7.0"

docs/bundle/message_producer.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Message producer
2+
3+
You can choose how to send messages either using a transport directly or with the client.
4+
Transport gives you the access to all transport specific features so you can tune things where the client provides you with easy to use abstraction.
5+
6+
## Transport
7+
8+
```php
9+
<?php
10+
11+
/** @var Symfony\Component\DependencyInjection\ContainerInterface $container */
12+
13+
/** @var Enqueue\Psr\PsrContext $context */
14+
$context = $container->get('enqueue.transport.context');
15+
16+
$context->createProducer()->send(
17+
$context->createQueue('a_queue'),
18+
$context->createMessage('Hello there!')
19+
);
20+
```
21+
22+
## Client
23+
24+
The client is shipped with two types of producers. The first one sends messages immediately
25+
where another one (it is called spool producer) collects them in memory and sends them `onTerminate` event (the response is already sent).
26+
27+
28+
29+
```php
30+
<?php
31+
32+
/** @var Symfony\Component\DependencyInjection\ContainerInterface $container */
33+
34+
/** @var \Enqueue\Client\ProducerInterface $producer */
35+
$producer = $container->get('enqueue.producer');
36+
37+
// message is being sent right now
38+
$producer->send('a_topic', 'Hello there!');
39+
40+
41+
/** @var \Enqueue\Client\SpoolProducer $spoolProducer */
42+
$spoolProducer = $container->get('enqueue.spool_producer');
43+
44+
// message is being sent on console.terminate or kernel.terminate event
45+
$spoolProducer->send('a_topic', 'Hello there!');
46+
47+
// you could send queued messages manually by calling flush method
48+
$spoolProducer->flush();
49+
```
50+
51+
[back to index](../index.md)

docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
- [Quick tour](bundle/quick_tour.md)
2626
- [Config reference](bundle/config_reference.md)
2727
- [Cli commands](bundle/cli_commands.md)
28+
- [Message producer](bundle/message_producer.md)
2829
- [Message processor](bundle/message_processor.md)
2930
- [Job queue](bundle/job_queue.md)
3031
- [Consumption extension](bundle/consumption_extension.md)

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public function load(array $configs, ContainerBuilder $container)
6464

6565
if (isset($config['client'])) {
6666
$loader->load('client.yml');
67+
$loader->load('extensions/flush_spool_producer_extension.yml');
6768

6869
foreach ($config['transport'] as $name => $transportConfig) {
6970
$this->factories[$name]->createDriver($container, $transportConfig);
@@ -88,10 +89,10 @@ public function load(array $configs, ContainerBuilder $container)
8889
$container->setParameter('enqueue.client.default_queue_name', $config['client']['default_processor_queue']);
8990

9091
if (false == empty($config['client']['traceable_producer'])) {
91-
$producerId = 'enqueue.client.traceable_message_producer';
92+
$producerId = 'enqueue.client.traceable_producer';
9293
$container->register($producerId, TraceableProducer::class)
9394
->setDecoratedService('enqueue.client.producer')
94-
->addArgument(new Reference('enqueue.client.traceable_message_producer.inner'))
95+
->addArgument(new Reference('enqueue.client.traceable_producer.inner'))
9596
;
9697
}
9798

pkg/enqueue-bundle/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Integrates message queue components to Symfony application.
99

1010
## Resources
1111

12+
* [Quick tour](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/quick_tour.md)
1213
* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)
1314
* [Questions](https://gitter.im/php-enqueue/Lobby)
1415
* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)

pkg/enqueue-bundle/Resources/config/client.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ services:
99
- '@enqueue.client.driver'
1010
- '@enqueue.client.extensions'
1111

12+
enqueue.client.spool_producer:
13+
class: 'Enqueue\Client\SpoolProducer'
14+
arguments:
15+
- '@enqueue.client.producer'
16+
1217
enqueue.client.extensions:
1318
class: 'Enqueue\Client\ChainExtension'
1419
public: false
@@ -18,6 +23,9 @@ services:
1823
enqueue.producer:
1924
alias: 'enqueue.client.producer'
2025

26+
enqueue.spool_producer:
27+
alias: 'enqueue.client.spool_producer'
28+
2129
enqueue.client.rpc_client:
2230
class: 'Enqueue\Client\RpcClient'
2331
arguments:
@@ -123,3 +131,10 @@ services:
123131
name: 'data_collector'
124132
template: 'EnqueueBundle:Profiler:panel.html.twig'
125133
id: 'enqueue.message_queue'
134+
135+
enqueue.flush_spool_producer_listener:
136+
class: 'Enqueue\Symfony\Client\FlushSpoolProducerListener'
137+
arguments:
138+
- '@enqueue.client.spool_producer'
139+
tags:
140+
- { name: 'kernel.event_subscriber' }
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
services:
2+
enqueue.client.flush_spool_producer_extension:
3+
class: 'Enqueue\Client\ConsumptionExtension\FlushSpoolProducerExtension'
4+
public: false
5+
arguments:
6+
- '@enqueue.client.spool_producer'
7+
tags:
8+
- { name: 'enqueue.consumption.extension', priority: -100 }
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Functional\Client;
4+
5+
use Enqueue\Bundle\Tests\Functional\WebTestCase;
6+
use Enqueue\Client\SpoolProducer;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class SpoolProducerTest extends WebTestCase
12+
{
13+
public function testCouldBeGetFromContainerAsService()
14+
{
15+
$producer = $this->container->get('enqueue.client.spool_producer');
16+
17+
$this->assertInstanceOf(SpoolProducer::class, $producer);
18+
}
19+
20+
public function testCouldBeGetFromContainerAsShortenAlias()
21+
{
22+
$producer = $this->container->get('enqueue.client.spool_producer');
23+
$aliasProducer = $this->container->get('enqueue.spool_producer');
24+
25+
$this->assertSame($producer, $aliasProducer);
26+
}
27+
}

pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ public function testShouldUseTraceableMessageProducerIfTraceableProducerOptionSe
246246
],
247247
]], $container);
248248

249-
$producer = $container->getDefinition('enqueue.client.traceable_message_producer');
249+
$producer = $container->getDefinition('enqueue.client.traceable_producer');
250250
self::assertEquals(TraceableProducer::class, $producer->getClass());
251251
self::assertEquals(
252252
['enqueue.client.producer', null, 0],
@@ -255,7 +255,7 @@ public function testShouldUseTraceableMessageProducerIfTraceableProducerOptionSe
255255

256256
self::assertInstanceOf(Reference::class, $producer->getArgument(0));
257257
self::assertEquals(
258-
'enqueue.client.traceable_message_producer.inner',
258+
'enqueue.client.traceable_producer.inner',
259259
(string) $producer->getArgument(0)
260260
);
261261
}

0 commit comments

Comments
 (0)