Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add events
  • Loading branch information
michalsn committed Oct 10, 2025
commit ec278e75031f2369b6b55c1fb0400a9fb4fa5f94
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
"php": "^8.1"
},
"require-dev": {
"codeigniter4/devkit": "^1.0",
"codeigniter4/devkit": "^1.3",
"codeigniter4/framework": "^4.3",
"predis/predis": "^2.0",
"phpstan/phpstan-strict-rules": "^1.5",
"phpstan/phpstan-strict-rules": "^2.0",
"php-amqplib/php-amqplib": "^3.7"
},
"minimum-stability": "dev",
Expand Down
1 change: 1 addition & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
<directory suffix=".php">./src/</directory>
</include>
<exclude>
<file>./src/Compatibility/SignalTrait.php</file>
<directory suffix=".php">./src/Commands/Generators</directory>
<directory suffix=".php">./src/Commands/Utils</directory>
<directory suffix=".php">./src/Config</directory>
Expand Down
132 changes: 129 additions & 3 deletions src/Commands/QueueWork.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,30 @@

use CodeIgniter\CLI\BaseCommand;
use CodeIgniter\CLI\CLI;
use CodeIgniter\Queue\Compatibility\SignalTrait;
use CodeIgniter\Queue\Config\Queue as QueueConfig;
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Events\QueueEventManager;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use Exception;
use Throwable;

class QueueWork extends BaseCommand
{
use SignalTrait;

/**
* The Command's Group
*
* @var string
*/
protected $group = 'Queue';

/**
* Worker ID for tracking this worker instance
*/
private string $workerId;

/**
* The Command's Name
*
Expand Down Expand Up @@ -126,6 +135,9 @@ public function run(array $params)

$startTime = microtime(true);

// Generate unique worker ID
$this->workerId = sprintf('worker-%s-%d', gethostname(), getmypid());

CLI::write('Listening for the jobs with the queue: ' . CLI::color($queue, 'light_cyan'), 'cyan');

if ($priority !== 'default') {
Expand All @@ -134,15 +146,38 @@ public function run(array $params)

CLI::write(PHP_EOL);

// Convert priority string to array
$priority = array_map('trim', explode(',', (string) $priority));

while (true) {
// Register signals for graceful shutdown
$this->registerSignals();

// Emit worker started event
QueueEventManager::workerStarted(
handler: service('queue')->name(),
queue: $queue,
priorities: $priority,
config: [
'max_jobs' => $maxJobs,
'max_time' => $maxTime,
'memory_limit' => $memory . 'MB',
'sleep' => $sleep,
'rest' => $rest,
],
metadata: [
'worker_id' => $this->workerId,
],
);

while ($this->isRunning()) {
$work = service('queue')->pop($queue, $priority);

if ($work === null) {
if ($stopWhenEmpty) {
CLI::write('No job available. Stopping.', 'yellow');

$this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'empty_queue');

return EXIT_SUCCESS;
}

Expand All @@ -154,14 +189,26 @@ public function run(array $params)
sleep((int) $sleep);

if ($this->checkMemory($memory)) {
$this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'memory_limit');

return EXIT_SUCCESS;
}

if ($this->shouldTerminate()) {
$this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'signal_stop');

return EXIT_SUCCESS;
}

if ($this->checkStop($queue, $startTime)) {
$this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'planned_stop');

return EXIT_SUCCESS;
}

if ($this->maxTimeCheck($maxTime, $startTime)) {
$this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'time_limit');

return EXIT_SUCCESS;
}
} else {
Expand All @@ -175,19 +222,33 @@ public function run(array $params)

$this->handleWork($work, $config, $tries, $retryAfter);

if ($this->shouldTerminate()) {
$this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'signal_stop');

return EXIT_SUCCESS;
}

if ($this->checkMemory($memory)) {
$this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'memory_limit');

return EXIT_SUCCESS;
}

if ($this->checkStop($queue, $startTime)) {
$this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'planned_stop');

return EXIT_SUCCESS;
}

if ($this->maxJobsCheck($maxJobs, $countJobs)) {
$this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'job_limit');

return EXIT_SUCCESS;
}

if ($this->maxTimeCheck($maxTime, $startTime)) {
$this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'time_limit');

return EXIT_SUCCESS;
}

Expand Down Expand Up @@ -237,10 +298,21 @@ private function readOptions(array $params, QueueConfig $config, string $queue):
private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?int $retryAfter): void
{
timer()->start('work');
$payload = $work->payload;
$startTime = microtime(true);
$payload = $work->payload;

$payloadMetadata = null;

// Emit job processing started event
QueueEventManager::jobProcessingStarted(
handler: service('queue')->name(),
queue: $work->queue,
job: $work,
metadata: [
'worker_id' => $this->workerId,
],
);

try {
// Load payload metadata
$payloadMetadata = PayloadMetadata::fromArray($payload['metadata'] ?? []);
Expand All @@ -253,7 +325,18 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
$job->process();

// Mark as done
service('queue')->done($work, $config->keepDoneJobs);
service('queue')->done($work);

// Emit job processing completed event
QueueEventManager::jobProcessingCompleted(
handler: service('queue')->name(),
queue: $work->queue,
job: $work,
processingTime: microtime(true) - $startTime,
metadata: [
'worker_id' => $this->workerId,
],
);

CLI::write('The processing of this job was successful', 'green');

Expand All @@ -265,6 +348,17 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
service('queue')->later($work, $retryAfter ?? $job->getRetryAfter());
} else {
// Mark as failed
QueueEventManager::jobFailed(
handler: service('queue')->name(),
queue: $work->queue,
job: $work,
exception: $err,
processingTime: microtime(true) - $startTime,
metadata: [
'worker_id' => $this->workerId,
],
);

service('queue')->failed($work, $err, $config->keepFailedJobs);
}
CLI::write('The processing of this job failed', 'red');
Expand Down Expand Up @@ -395,4 +489,36 @@ private function checkStop(string $queue, float $startTime): bool

return false;
}

/**
* Handle interruption
*/
private function onInterruption(int $signal): void
{
$this->requestTermination();

CLI::write(sprintf('The termination of this worker has been requested with: %s.', $this->getSignalName($signal)), 'yellow');
}

/**
* Emit worker stopped event with runtime statistics
*/
private function emitWorkerStoppedEvent(string $queue, array $priorities, float $startTime, int $jobsProcessed, string $reason): void
{
$uptime = microtime(true) - $startTime;

QueueEventManager::workerStopped(
handler: service('queue')->name(),
queue: $queue,
priorities: $priorities,
uptime: $uptime,
jobsProcessed: $jobsProcessed,
metadata: [
'worker_id' => $this->workerId,
'stop_reason' => $reason,
'memory_usage' => memory_get_usage(true),
'memory_peak' => memory_get_peak_usage(true),
],
);
}
}
Loading