Skip to content

Commit eab89a6

Browse files
authored
Merge pull request swooletw#136 from damonto/feature/swoole_async_task
Supported Swoole async queue driver
2 parents 29bf931 + 7828ef2 commit eab89a6

File tree

11 files changed

+395
-24
lines changed

11 files changed

+395
-24
lines changed

.github/ISSUE_TEMPLATE.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
Please answer these questions before submitting your issue. Thanks!
1+
Make sure you read **Issues Guideline** and answer these questions before submitting your issue. Thanks!
2+
(Any **non-English** issues will be closed immediately.)
23

34
1. Please provide your PHP and Swoole version. (`php -v` and `php --ri swoole`)
45

README.md

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ This package provides a high performance HTTP server to speed up your Laravel/Lu
1616
## Features
1717

1818
* Run **Laravel/Lumen** application on top of **Swoole**.
19-
* Outstanding performance boosting up to **30x**.
19+
* Outstanding performance boosting up to **5x** faster.
2020
* Sandbox mode to isolate app container.
2121
* Support running websocket server in **Laravel**.
2222
* Support `Socket.io` protocol.
@@ -28,7 +28,8 @@ Please see [Wiki](https://github.com/swooletw/laravel-swoole/wiki)
2828

2929
## Benchmark
3030

31-
Test with clean Lumen 5.5, using MacBook Air 13, 2015.
31+
Test with clean Lumen 5.6, using DigitalOcean 3 CPUs / 1 GB Memory / PHP 7.2 / Ubuntu 16.04.4 x64
32+
3233
Benchmarking Tool: [wrk](https://github.com/wg/wrk)
3334

3435
```
@@ -38,27 +39,31 @@ wrk -t4 -c100 http://your.app
3839
### Nginx with FPM
3940

4041
```
41-
Running 10s test @ http://lumen.app:9999
42-
4 threads and 100 connections
42+
wrk -t4 -c10 http://lumen-swoole.local
43+
44+
Running 10s test @ http://lumen-swoole.local
45+
4 threads and 10 connections
4346
Thread Stats Avg Stdev Max +/- Stdev
44-
Latency 1.14s 191.03ms 1.40s 90.31%
45-
Req/Sec 22.65 10.65 50.00 65.31%
46-
815 requests in 10.07s, 223.65KB read
47-
Requests/sec: 80.93
48-
Transfer/sec: 22.21KB
47+
Latency 6.41ms 1.56ms 19.71ms 71.32%
48+
Req/Sec 312.99 28.71 373.00 72.00%
49+
12469 requests in 10.01s, 3.14MB read
50+
Requests/sec: 1245.79
51+
Transfer/sec: 321.12KB
4952
```
5053

5154
### Swoole HTTP Server
5255

5356
```
54-
Running 10s test @ http://127.0.0.1:1215
55-
4 threads and 100 connections
57+
wrk -t4 -c10 http://lumen-swoole.local:1215
58+
59+
Running 10s test @ http://lumen-swoole.local:1215
60+
4 threads and 10 connections
5661
Thread Stats Avg Stdev Max +/- Stdev
57-
Latency 11.58ms 4.74ms 68.73ms 81.63%
58-
Req/Sec 2.19k 357.43 2.90k 69.50%
59-
87879 requests in 10.08s, 15.67MB read
60-
Requests/sec: 8717.00
61-
Transfer/sec: 1.55MB
62+
Latency 2.39ms 4.88ms 105.21ms 94.55%
63+
Req/Sec 1.26k 197.13 1.85k 68.75%
64+
50248 requests in 10.02s, 10.88MB read
65+
Requests/sec: 5016.94
66+
Transfer/sec: 1.09MB
6267
```
6368

6469
## Q&A

src/HttpServiceProvider.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use SwooleTW\Http\Commands\HttpServerCommand;
1010
use Swoole\Websocket\Server as WebsocketServer;
1111
use SwooleTW\Http\Coroutine\Connectors\MySqlConnector;
12+
use SwooleTW\Http\Task\Connectors\SwooleTaskConnector;
1213

1314
/**
1415
* @codeCoverageIgnore
@@ -45,6 +46,7 @@ public function register()
4546
$this->registerManager();
4647
$this->registerCommands();
4748
$this->registerDatabaseDriver();
49+
$this->registerSwooleQueueDriver();
4850
}
4951

5052
/**
@@ -129,7 +131,7 @@ protected function configureSwooleServer()
129131
$config = $this->app['config']->get('swoole_http.server.options');
130132

131133
// only enable task worker in websocket mode
132-
if (! $this->isWebsocket) {
134+
if (env('QUEUE_DRIVER') !== 'swoole' && ! $this->isWebsocket) {
133135
unset($config['task_worker_num']);
134136
}
135137

@@ -174,4 +176,16 @@ protected function registerDatabaseDriver()
174176
});
175177
});
176178
}
179+
180+
/**
181+
* Register queue driver for swoole async task.
182+
*/
183+
protected function registerSwooleQueueDriver()
184+
{
185+
$this->app->afterResolving('queue', function ($manager) {
186+
$manager->addConnector('swoole', function () {
187+
return new SwooleTaskConnector(static::$server);
188+
});
189+
});
190+
}
177191
}

src/Server/Manager.php

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Throwable;
66
use Swoole\Http\Server;
77
use SwooleTW\Http\Server\Sandbox;
8+
use SwooleTW\Http\Task\SwooleTaskJob;
89
use Illuminate\Support\Facades\Facade;
910
use SwooleTW\Http\Websocket\Websocket;
1011
use SwooleTW\Http\Transformers\Request;
@@ -228,13 +229,22 @@ public function onTask($server, $taskId, $srcWorkerId, $data)
228229
$this->container['events']->fire('swoole.task', func_get_args());
229230

230231
try {
231-
// push websocket message
232-
if ($this->isWebsocket
233-
&& array_key_exists('action', $data)
234-
&& $data['action'] === Websocket::PUSH_ACTION) {
235-
$this->pushMessage($server, $data['data'] ?? []);
232+
if (is_array($data)) {
233+
// push websocket message
234+
if ($this->isWebsocket
235+
&& array_key_exists('action', $data)
236+
&& $data['action'] === Websocket::PUSH_ACTION) {
237+
$this->pushMessage($server, $data['data'] ?? []);
238+
}
239+
} elseif (is_string($data)) {
240+
$decoded= json_decode($data, true);
241+
242+
if (JSON_ERROR_NONE === json_last_error() && isset($decoded['job'])) {
243+
(new SwooleTaskJob($this->container, $server, $data, $taskId, $srcWorkerId))->fire();
244+
}
236245
}
237246
} catch (Throwable $e) {
247+
dump($e);
238248
$this->logServerError($e);
239249
}
240250
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace SwooleTW\Http\Task\Connectors;
4+
5+
use SwooleTW\Http\Task\SwooleTaskQueue;
6+
use Illuminate\Queue\Connectors\ConnectorInterface;
7+
8+
class SwooleTaskConnector implements ConnectorInterface
9+
{
10+
/**
11+
* Swoole Server Instance
12+
*
13+
* @var \Swoole\Http\Server
14+
*/
15+
protected $swoole;
16+
17+
/**
18+
* Create a new Swoole Async task connector instance.
19+
*
20+
* @param \Swoole\Http\Server $swoole
21+
* @return void
22+
*/
23+
public function __construct($swoole)
24+
{
25+
$this->swoole = $swoole;
26+
}
27+
28+
/**
29+
* Establish a queue connection.
30+
*
31+
* @param array $config
32+
* @return \Illuminate\Contracts\Queue\Queue
33+
*/
34+
public function connect(array $config)
35+
{
36+
return new SwooleTaskQueue($this->swoole);
37+
}
38+
}

src/Task/SwooleTaskJob.php

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
<?php
2+
3+
namespace SwooleTW\Http\Task;
4+
5+
use Illuminate\Queue\Jobs\Job;
6+
use Illuminate\Contracts\Container\Container;
7+
use Illuminate\Contracts\Queue\Job as JobContract;
8+
9+
class SwooleTaskJob extends Job implements JobContract
10+
{
11+
/**
12+
* The Swoole Server instance.
13+
*
14+
* @var \Swoole\Http\Server
15+
*/
16+
protected $swoole;
17+
18+
/**
19+
* The Swoole async job raw payload.
20+
*
21+
* @var array
22+
*/
23+
protected $job;
24+
25+
/**
26+
* The Task id
27+
*
28+
* @var int
29+
*/
30+
protected $taskId;
31+
32+
/**
33+
* The src worker Id
34+
*
35+
* @var int
36+
*/
37+
protected $srcWrokerId;
38+
39+
/**
40+
* Create a new job instance.
41+
*
42+
* @param \Illuminate\Container\Container $container
43+
* @param \Swoole\Http\Server $swoole
44+
* @param string $job
45+
* @return void
46+
*/
47+
public function __construct(Container $container, $swoole, $job, $taskId, $srcWrokerId)
48+
{
49+
$this->container = $container;
50+
$this->swoole = $swoole;
51+
$this->job = $job;
52+
$this->taskId = $taskId;
53+
$this->srcWorkderId = $srcWrokerId;
54+
}
55+
56+
/**
57+
* Fire the job.
58+
*
59+
* @return void
60+
*/
61+
public function fire()
62+
{
63+
if (method_exists($this, 'resolveAndFire')) {
64+
$this->resolveAndFire(json_decode($this->getRawBody(), true));
65+
} else {
66+
parent::fire();
67+
}
68+
}
69+
70+
/**
71+
* Get the number of times the job has been attempted.
72+
* @return int
73+
*/
74+
public function attempts()
75+
{
76+
return ($this->job['attempts'] ?? null) + 1;
77+
}
78+
79+
/**
80+
* Get the raw body string for the job.
81+
* @return string
82+
*/
83+
public function getRawBody()
84+
{
85+
return $this->job;
86+
}
87+
88+
89+
/**
90+
* Get the job identifier.
91+
* @return string
92+
*/
93+
public function getJobId()
94+
{
95+
return $this->taskId;
96+
}
97+
98+
/**
99+
* Get the service container instance.
100+
*
101+
* @return \Illuminate\Container\Container
102+
*/
103+
public function getContainer()
104+
{
105+
return $this->container;
106+
}
107+
}

0 commit comments

Comments
 (0)