Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
208511b
Merge pull request #81 from swooletw/develop
albertcht Jun 12, 2018
51dea74
Merge pull request #84 from swooletw/develop
albertcht Jun 12, 2018
9adb5bf
remove ext-swoole dependency from composer.json becauseof unknown com…
albertcht Jun 12, 2018
32102cf
Merge pull request #87 from swooletw/develop
albertcht Jun 12, 2018
82b0913
fix sandbox enable
albertcht Jun 12, 2018
7e1a9cb
Merge pull request #88 from swooletw/develop
albertcht Jun 12, 2018
b30fc40
Update README.md
albertcht Jun 16, 2018
3b816be
fix parser test in swoole 4.0
albertcht Jun 16, 2018
e5f1cf7
Merge pull request #91 from swooletw/develop
albertcht Jun 16, 2018
b32376c
Update ISSUE_TEMPLATE.md
albertcht Jul 21, 2018
d30f44b
Update ISSUE_TEMPLATE.md
albertcht Jul 27, 2018
7c11d75
Update ISSUE_TEMPLATE.md
albertcht Jul 28, 2018
39229ef
Supported Swoole async task queue driver
damonto Sep 1, 2018
0e97d59
Fix AsyncTaskJob not found
damonto Sep 1, 2018
214b619
Remove ServiceProvider
damonto Sep 1, 2018
db8ffe2
Fix unable create swoole_server
damonto Sep 1, 2018
fc59fa9
fix conflicts
damonto Sep 2, 2018
4034bf3
fix conflicts again
damonto Sep 2, 2018
b7249f6
Merged coroutine feature
damonto Sep 2, 2018
83bcd34
Add phpunit test cases
damonto Sep 2, 2018
5783325
Change comments format
damonto Sep 2, 2018
58087d0
Remove getJobExpiration because of unused
damonto Sep 2, 2018
362fbc0
Don't override createObjectPayload
damonto Sep 2, 2018
d4656bf
Compatible with older versions
damonto Sep 2, 2018
a840966
Fix undefined method getContainer()
damonto Sep 2, 2018
1544615
Move getContainer method to SwooleTaskJob
damonto Sep 2, 2018
6792ed8
Uncomment code
damonto Sep 2, 2018
18c8444
Swoole_timer_after first parameter is microsecond
damonto Sep 2, 2018
7828ef2
Merge branch 'feature/coroutine_feature' into feature/swoole_async_task
albertcht Sep 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Supported Swoole async task queue driver
  • Loading branch information
damonto committed Sep 1, 2018
commit 39229efcd82827c43517a99999d578d2a84d58a1
87 changes: 87 additions & 0 deletions src/AsyncTask/AsyncTaskJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?php

namespace SwooleTW\Http\AsyncTask;

use Illuminate\Queue\Jobs\Job;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;

class AsyncTaskJob extends Job implements JobContract
{
/**
* The Swoole Server instance.
*
* @var \Swoole\Http\Server
*/
protected $swoole;

/**
* The Swoole async job raw payload.
*
* @var array
*/
protected $job;

/**
* The Task id
*
* @var int
*/
protected $taskId;

/**
* The src worker Id
*
* @var int
*/
protected $srcWrokerId;

/**
* Create a new job instance.
*
* @param \Illuminate\Container\Container $container
* @param \Swoole\Http\Server $swoole
* @param string $job
*
* @return void
*/
public function __construct(Container $container, $swoole, $job, $taskId, $srcWrokerId)
{
$this->container = $container;
$this->swoole = $swoole;
$this->job = $job;
$this->taskId = $taskId;
$this->srcWorkderId = $srcWrokerId;
}

/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function attempts()
{
return ($this->job['attempts'] ?? null) + 1;
}

/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job;
}


/**
* Get the job identifier.
*
* @return string
*/
public function getJobId()
{
return $this->taskId;
}
}
124 changes: 124 additions & 0 deletions src/AsyncTask/AsyncTaskQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

namespace SwooleTW\Http\AsyncTask;

use Illuminate\Queue\Queue;
use Illuminate\Contracts\Queue\Queue as QueueContract;

class AsyncTaskQueue extends Queue implements QueueContract
{
/**
* Swoole Connector
*
* @var \Swoole\Http\Server
*/
protected $swoole;

/**
* Create Async Task instance.
*
* @param \Swoole\Http\Server $swoole
*/
public function __construct($swoole)
{
$this->swoole = $swoole;
}


/**
* Push a new job onto the queue.
*
* @param string|object $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}

/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->swoole->task($payload, $queue);
}

/**
* Push a new job onto the queue after a delay.
*
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string|object $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
return swoole_timer_after($this->secondsUntil($delay), function () use ($job, $data, $queue) {
return $this->push($job, $data, $queue);
});
}

/**
* Create a payload for an object-based queue handler.
*
* @param mixed $job
* @return array
*/
protected function createObjectPayload($job)
{
return [
'maxTries' => $job->tries ?? null,
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'timeout' => $job->timeout ?? null,
'timeoutAt' => $this->getJobExpiration($job),
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job)
]
];
}

/**
* Create a typical, string based queue payload array.
*
* @param string $job
* @param mixed $data
*
* @throws Expcetion
*/
protected function createStringPayload($job, $data)
{
throw new Exception("Unsupported empty data");
}

/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
return -1;
}

/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
return null;
}
}
38 changes: 38 additions & 0 deletions src/AsyncTask/Connectors/AsyncTaskConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace SwooleTW\Http\AsyncTask\Connectors;

use SwooleTW\Http\AsyncTask\AsyncTaskQueue;
use Illuminate\Queue\Connectors\ConnectorInterface;

class AsyncTaskConnector implements ConnectorInterface
{
/**
* Swoole Server Instance
*
* @var object
*/
protected $swoole;

/**
* Create a new Swoole Async task connector instance.
*
* @param object $swoole
* @return void
*/
public function __construct($swoole)
{
$this->swoole = $swoole;
}

/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new AsyncTaskQueue($this->swoole);
}
}
29 changes: 29 additions & 0 deletions src/AsyncTask/ServiceProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace SwooleTW\Http\AsyncTask;

use SwooleTW\Http\AsyncTask\Connectors\AsyncTaskConnector;

class ServiceProvider extends \Illuminate\Support\ServiceProvider
{
/**
* Indicates if loading of the provider is deferred.
*
* @var bool
*/
protected $defer = false;

/**
* Add the connector to the queue drivers.
*
* @return void
*/
public function boot()
{
$swooleServer = $this->app['swoole.http'];

$this->app['queue']->addConnector('swoole_async_task', function () {
return new AsyncTaskConnector($this->app['swoole.http']->getServer());
});
}
}
26 changes: 20 additions & 6 deletions src/Server/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
use SwooleTW\Http\Table\CanSwooleTable;
use SwooleTW\Http\Websocket\CanWebsocket;
use Illuminate\Contracts\Container\Container;
use SwooleTW\Http\Websocket\Rooms\RoomContract;
use Swoole\WebSocket\Server as WebSocketServer;
use SwooleTW\Http\Websocket\Rooms\RoomContract;
use Illuminate\Contracts\Debug\ExceptionHandler;

class Manager
Expand Down Expand Up @@ -331,11 +331,15 @@ public function onTask(HttpServer $server, $taskId, $srcWorkerId, $data)
$this->container['events']->fire('swoole.task', func_get_args());

try {
// push websocket message
if ($this->isWebsocket
&& array_key_exists('action', $data)
&& $data['action'] === Websocket::PUSH_ACTION) {
$this->pushMessage($server, $data['data'] ?? []);
if (is_array($data)) {
// push websocket message
if ($this->isWebsocket
&& array_key_exists('action', $data)
&& $data['action'] === Websocket::PUSH_ACTION) {
$this->pushMessage($server, $data['data'] ?? []);
}
} else {
(new AsyncTaskJob($this->container, $server, $data, $taskId, $srcWorkerId))->fire();
}
} catch (Exception $e) {
$this->logServerError($e);
Expand Down Expand Up @@ -494,4 +498,14 @@ protected function logServerError(Exception $e)
{
$this->app[ExceptionHandler::class]->report($e);
}

/**
* Get the server instance.
*
* @return Swoole\Http\Server
*/
public function getServer()
{
return $this->server;
}
}