Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions src/main/scala/com/redis/RedisClientSettings.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.redis

import RedisClientSettings._
import java.lang.{Long => JLong}

import RedisClientSettings._

case class RedisClientSettings(
backpressureBufferSettings: Option[BackpressureBufferSettings] = None,
reconnectionSettings: Option[ReconnectionSettings] = None
reconnectionSettings: ReconnectionSettings = NoReconnectionSettings
)

object RedisClientSettings {
Expand All @@ -24,32 +25,62 @@ object RedisClientSettings {
def newSchedule: ReconnectionSchedule

trait ReconnectionSchedule {
val maxAttempts: Long
var attempts = 0

/**
* Gets the number of milliseconds until the next reconnection attempt.
*
* This method is expected to increment attempts like an iterator
*
* @return milliseconds until the next attempt
*/
def nextDelayMs: Long
}
}

case class ConstantReconnectionSettings(constantDelayMs: Long) extends ReconnectionSettings {
case object NoReconnectionSettings extends ReconnectionSettings{
def newSchedule: ReconnectionSchedule = new ReconnectionSchedule {
val maxAttempts: Long = 0
def nextDelayMs: Long = throw new NoSuchElementException("No delay available")
}
}

case class ConstantReconnectionSettings(constantDelayMs: Long, maximumAttempts: Long = Long.MaxValue) extends ReconnectionSettings {
require(constantDelayMs >= 0, s"Invalid negative reconnection delay (received $constantDelayMs)")
require(maximumAttempts >= 0, s"Invalid negative maximum attempts (received $maximumAttempts)")

def newSchedule: ReconnectionSchedule = new ConstantSchedule

class ConstantSchedule extends ReconnectionSchedule {
def nextDelayMs = constantDelayMs
val maxAttempts = maximumAttempts
def nextDelayMs = {
attempts += 1
constantDelayMs
}
}
}

case class ExponentialReconnectionPolicy(baseDelayMs: Long, maxDelayMs: Long) extends ReconnectionSettings {
case class ExponentialReconnectionSettings(baseDelayMs: Long, maxDelayMs: Long, maximumAttempts: Long = Long.MaxValue) extends ReconnectionSettings {
require(baseDelayMs > 0, s"Base reconnection delay must be greater than 0. Received $baseDelayMs")
require(maxDelayMs > 0, s"Maximum reconnection delay must be greater than 0. Received $maxDelayMs")
require(maxDelayMs >= baseDelayMs, "Maximum reconnection delay cannot be smaller than base reconnection delay")

def newSchedule = new ExponentialSchedule

private val ceil = if ((baseDelayMs & (baseDelayMs - 1)) == 0) 0 else 1
private val attemptCeiling = JLong.SIZE - JLong.numberOfLeadingZeros(Long.MaxValue / baseDelayMs) - ceil

class ExponentialSchedule extends ReconnectionSchedule {
var attempts = 0
val maxAttempts = maximumAttempts
def nextDelayMs = {
attempts += 1
Math.min(baseDelayMs * (1L << attempts), maxDelayMs)
if (attempts > attemptCeiling) {
maxDelayMs
} else {
val factor = 1L << (attempts - 1)
Math.min(baseDelayMs * factor, maxDelayMs)
}
}
}
}
Expand Down
45 changes: 22 additions & 23 deletions src/main/scala/com/redis/RedisConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis
context watch pipe

case CommandFailed(c: Connect) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if a command failure always means disconnection. As we already have termination management, another failure handling strategy might be needed IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think that means in this block the IO(Tcp) ! Connect failed. Wouldn't the same reconnection strategy apply to failed connections?

I did remove the context become unconnected since we are already in that state.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually CommandFailed results from Connect message to the TCP manager. I think specifying an incorrect InetSocketAddress will also result in a CommandFailed message. CommandFailed can come also from failed writes due to throttling. See http://doc.akka.io/docs/akka/snapshot/scala/io-tcp.html#throttling-reads-and-writes. So it's not necessarily disconnection.

settings.reconnectionSettings match {
case Some(r) =>
if (reconnectionSchedule.isEmpty) {
reconnectionSchedule = Some(settings.reconnectionSettings.get.newSchedule)
}
val delayMs = reconnectionSchedule.get.nextDelayMs
log.error("Connect failed for {} with {}. Reconnecting in {} ms... ", c.remoteAddress, c.failureMessage, delayMs)
context.system.scheduler.scheduleOnce(Duration(delayMs, TimeUnit.MILLISECONDS), IO(Tcp), Connect(remote))(context.dispatcher, self)
case None =>
log.error("Connect failed for {} with {}. Stopping... ", c.remoteAddress, c.failureMessage)
context stop self
if (reconnectionSchedule.isEmpty) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep it as an Option? As we have generalized maxAttempts, just a lazy val would be sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I'll make those changes.

reconnectionSchedule = Some(settings.reconnectionSettings.newSchedule)
}
if (reconnectionSchedule.get.attempts < reconnectionSchedule.get.maxAttempts) {
val delayMs = reconnectionSchedule.get.nextDelayMs
log.error("Connect failed for {} with {}. Reconnecting in {} ms... ", c.remoteAddress, c.failureMessage, delayMs)
context become unconnected
context.system.scheduler.scheduleOnce(Duration(delayMs, TimeUnit.MILLISECONDS), IO(Tcp), Connect(remote))(context.dispatcher, self)
} else {
log.error("Connect failed for {} with {}. Stopping... ", c.remoteAddress, c.failureMessage)
context stop self
}
}

Expand Down Expand Up @@ -123,18 +123,17 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis

def withTerminationManagement(handler: Receive): Receive = handler orElse {
case Terminated(x) => {
settings.reconnectionSettings match {
case Some(r) =>
if (reconnectionSchedule.isEmpty) {
reconnectionSchedule = Some(settings.reconnectionSettings.get.newSchedule)
}
val delayMs = reconnectionSchedule.get.nextDelayMs
log.error("Child termination detected: {}. Reconnecting in {} ms... ", x, delayMs)
context become unconnected
context.system.scheduler.scheduleOnce(Duration(delayMs, TimeUnit.MILLISECONDS), IO(Tcp), Connect(remote))(context.dispatcher, self)
case None =>
log.error("Child termination detected: {}", x)
context stop self
if (reconnectionSchedule.isEmpty) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

reconnectionSchedule = Some(settings.reconnectionSettings.newSchedule)
}
if (reconnectionSchedule.get.attempts < reconnectionSchedule.get.maxAttempts) {
val delayMs = reconnectionSchedule.get.nextDelayMs
log.error("Child termination detected: {}. Reconnecting in {} ms... ", x, delayMs)
context become unconnected
context.system.scheduler.scheduleOnce(Duration(delayMs, TimeUnit.MILLISECONDS), IO(Tcp), Connect(remote))(context.dispatcher, self)
} else {
log.error("Child termination detected: {}", x)
context stop self
}
}
}
Expand Down
38 changes: 25 additions & 13 deletions src/test/scala/com/redis/ClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package com.redis

import scala.concurrent.Future

import akka.testkit.TestProbe
import org.junit.runner.RunWith
import org.scalatest.exceptions.TestFailedException
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith

import serialization._


@RunWith(classOf[JUnitRunner])
class ClientSpec extends RedisSpecBase {

Expand Down Expand Up @@ -75,21 +74,34 @@ class ClientSpec extends RedisSpecBase {
}

describe("reconnections based on policy") {
it("should reconnect") {
val key = "reconnect_test"

client.lpush(key, 0)

it("should not reconnect by default") {
val probe = TestProbe()
probe watch client.clientRef
// Extract our address
// TODO Cleaner address extraction, perhaps in ServerOperations.client?
val address = client.client.list().futureValue.get.toString.split(" ").head.split("=").last
client.client.kill(address).futureValue should be (true)
client.client.kill(address).futureValue should be(true)
probe.expectTerminated(client.clientRef)
}

client.lpush(key, 1 to 100).futureValue should equal (101)
val list = client.lrange[Long](key, 0, -1).futureValue
it("should reconnect with settings") {
withReconnectingClient {
client =>
val key = "reconnect_test"

list.size should equal (101)
list.reverse should equal (0 to 100)
client.lpush(key, 0)

// Extract our address
// TODO Cleaner address extraction, perhaps in ServerOperations.client?
val address = client.client.list().futureValue.get.toString.split(" ").head.split("=").last
client.client.kill(address).futureValue should be(true)

client.lpush(key, 1 to 100).futureValue should equal(101)
val list = client.lrange[Long](key, 0, -1).futureValue

list.size should equal(101)
list.reverse should equal(0 to 100)
}
}
}
}
19 changes: 12 additions & 7 deletions src/test/scala/com/redis/RedisSpecBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,36 @@ package com.redis

import scala.concurrent.duration._

import akka.util.Timeout
import akka.actor._
import akka.testkit.TestKit
import akka.util.Timeout
import com.redis.RedisClientSettings.ConstantReconnectionSettings
import org.scalatest._
import org.scalatest.concurrent.{Futures, ScalaFutures}
import org.scalatest.time._

trait RedisSpecBase extends FunSpec
class RedisSpecBase(_system: ActorSystem) extends TestKit(_system)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

with FunSpecLike
with Matchers
with Futures
with ScalaFutures
with BeforeAndAfterEach
with BeforeAndAfterAll {
import RedisSpecBase._

// Akka setup
implicit val system = ActorSystem("redis-test-"+ iter.next)
def this() = this(ActorSystem("redis-test-"+ RedisSpecBase.iter.next))
implicit val executionContext = system.dispatcher
implicit val timeout = Timeout(2 seconds)

// Scalatest setup
implicit val defaultPatience = PatienceConfig(timeout = Span(5, Seconds), interval = Span(5, Millis))

// Redis client setup
val client = RedisClient("localhost", 6379, settings = RedisClientSettings(reconnectionSettings = Some(ConstantReconnectionSettings(1000))))
val client = RedisClient("localhost", 6379)

def withReconnectingClient(testCode: RedisClient => Any) = {
val client = RedisClient("localhost", 6379, settings = RedisClientSettings(reconnectionSettings = ConstantReconnectionSettings(1000)))
testCode(client)
}

override def beforeEach = {
client.flushdb()
Expand All @@ -50,6 +55,6 @@ trait RedisSpecBase extends FunSpec

object RedisSpecBase {

private val iter = Iterator from 0
val iter = Iterator from 0

}