Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add circuit breaker job middleware
  • Loading branch information
paras-malhotra committed Mar 4, 2021
commit 0ddc7ab7e94003be1eec3080fce6aea3382912e6
129 changes: 129 additions & 0 deletions src/Illuminate/Queue/Middleware/CircuitBreaker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
<?php

namespace Illuminate\Queue\Middleware;

use Illuminate\Cache\RateLimiter;
use Illuminate\Container\Container;
use Throwable;

class CircuitBreaker
{
/**
* The maximum number of attempts allowed before the circuit is opened.
*
* @var int
*/
protected $maxAttempts;

/**
* The number of minutes until the maximum attempts are reset.
*
* @var int
*/
protected $decayMinutes;

/**
* The number of minutes to wait before retrying the job after an exception.
*
* @var int
*/
protected $retryAfterMinutes;

/**
* The rate limiter key.
*
* @var string
*/
protected $key;

/**
* The prefix of the rate limiter key.
*
* @var string
*/
protected $prefix = 'circuit_breaker:';

/**
* The rate limiter instance.
*
* @var \Illuminate\Cache\RateLimiter
*/
protected $limiter;

/**
* Create a new middleware instance.
*
* @param int $maxAttempts
* @param int $decayMinutes
* @param int $retryAfterMinutes
* @param string $key
*/
public function __construct($maxAttempts = 10, $decayMinutes = 10, $retryAfterMinutes = 0, string $key = '')
{
$this->maxAttempts = $maxAttempts;
$this->decayMinutes = $decayMinutes;
$this->retryAfterMinutes = $retryAfterMinutes;
$this->key = $key;
}

/**
* Process the job.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
$this->limiter = Container::getInstance()->make(RateLimiter::class);

if ($this->limiter->tooManyAttempts($jobKey = $this->getKey($job), $this->maxAttempts)) {
return $job->release($this->getTimeUntilNextRetry($jobKey));
}

try {
$next($job);

$this->limiter->clear($jobKey);
} catch (Throwable $throwable) {
$this->limiter->hit($jobKey, $this->decayMinutes * 60);

return $job->release($this->retryAfterMinutes * 60);
}
}

/**
* Set the prefix of the rate limiter key.
*
* @param string $prefix
* @return $this
*/
public function withPrefix(string $prefix)
{
$this->prefix = $prefix;

return $this;
}

/**
* Get the number of seconds that should elapse before the job is retried.
*
* @param string $key
* @return int
*/
protected function getTimeUntilNextRetry($key)
{
return $this->limiter->availableIn($key) + 3;
}

/**
* Get the cache key associated for the rate limiter.
*
* @param mixed $job
* @return string
*/
protected function getKey($job)
{
return md5($this->prefix.(empty($this->key) ? get_class($job) : $this->key));
}
}
144 changes: 144 additions & 0 deletions tests/Integration/Queue/CircuitBreakerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
<?php

namespace Illuminate\Tests\Integration\Queue;

use Exception;
use Illuminate\Bus\Dispatcher;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\CallQueuedHandler;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\CircuitBreaker;
use Mockery as m;
use Orchestra\Testbench\TestCase;

/**
* @group integration
*/
class CircuitBreakerTest extends TestCase
{
protected function tearDown(): void
{
parent::tearDown();

m::close();
}

public function testCircuitIsOpenedForJobErrors()
{
$this->assertJobWasReleasedImmediately(CircuitBreakerTestJob::class);
$this->assertJobWasReleasedImmediately(CircuitBreakerTestJob::class);
$this->assertJobWasReleasedWithDelay(CircuitBreakerTestJob::class);
}

public function testCircuitStaysClosedForSuccessfulJobs()
{
$this->assertJobRanSuccessfully(CircuitBreakerSuccessfulJob::class);
$this->assertJobRanSuccessfully(CircuitBreakerSuccessfulJob::class);
$this->assertJobRanSuccessfully(CircuitBreakerSuccessfulJob::class);
}

public function testCircuitResetsAfterSuccess()
{
$this->assertJobWasReleasedImmediately(CircuitBreakerTestJob::class);
$this->assertJobRanSuccessfully(CircuitBreakerSuccessfulJob::class);
$this->assertJobWasReleasedImmediately(CircuitBreakerTestJob::class);
$this->assertJobWasReleasedImmediately(CircuitBreakerTestJob::class);
$this->assertJobWasReleasedWithDelay(CircuitBreakerTestJob::class);
}

protected function assertJobWasReleasedImmediately($class)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('release')->with(0)->once();
$job->shouldReceive('isReleased')->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);

$instance->call($job, [
'command' => serialize($command = new $class),
]);

$this->assertTrue($class::$handled);
}

protected function assertJobWasReleasedWithDelay($class)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('release')->withArgs(function ($delay) {
return $delay >= 600;
})->once();
$job->shouldReceive('isReleased')->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);

$instance->call($job, [
'command' => serialize($command = new $class),
]);

$this->assertFalse($class::$handled);
}

protected function assertJobRanSuccessfully($class)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false);
$job->shouldReceive('delete')->once();

$instance->call($job, [
'command' => serialize($command = new $class),
]);

$this->assertTrue($class::$handled);
}
}

class CircuitBreakerTestJob
{
use InteractsWithQueue, Queueable;

public static $handled = false;

public function handle()
{
static::$handled = true;

throw new Exception;
}

public function middleware()
{
return [new CircuitBreaker(2, 10, 0, 'test')];
}
}

class CircuitBreakerSuccessfulJob
{
use InteractsWithQueue, Queueable;

public static $handled = false;

public function handle()
{
static::$handled = true;
}

public function middleware()
{
return [new CircuitBreaker(2, 10, 0, 'test')];
}
}