From e4eb99cd325dbf5d6c26d677fdc3f6dceaab75d1 Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Sat, 19 Apr 2014 12:44:14 -0500 Subject: [PATCH 01/15] Rename ExponentialReconnectionPolicy to ExponentialReconnectionSettings --- src/main/scala/com/redis/RedisClientSettings.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/redis/RedisClientSettings.scala b/src/main/scala/com/redis/RedisClientSettings.scala index 782f1ec..273376a 100644 --- a/src/main/scala/com/redis/RedisClientSettings.scala +++ b/src/main/scala/com/redis/RedisClientSettings.scala @@ -38,7 +38,7 @@ object RedisClientSettings { } } - case class ExponentialReconnectionPolicy(baseDelayMs: Long, maxDelayMs: Long) extends ReconnectionSettings { + case class ExponentialReconnectionSettings(baseDelayMs: Long, maxDelayMs: Long) extends ReconnectionSettings { require(baseDelayMs > 0, s"Base reconnection delay must be greater than 0. Received $baseDelayMs") require(maxDelayMs > 0, s"Maximum reconnection delay must be greater than 0. Received $maxDelayMs") require(maxDelayMs >= baseDelayMs, "Maximum reconnection delay cannot be smaller than base reconnection delay") From 9782806b77fb37eb8868fec392eafaae52c3b0e2 Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Sat, 19 Apr 2014 14:23:45 -0500 Subject: [PATCH 02/15] Add support for maximum attempts in the reconnection settings --- .../scala/com/redis/RedisClientSettings.scala | 45 ++++++++++++++++--- .../scala/com/redis/RedisConnection.scala | 45 +++++++++---------- 2 files changed, 60 insertions(+), 30 deletions(-) diff --git a/src/main/scala/com/redis/RedisClientSettings.scala b/src/main/scala/com/redis/RedisClientSettings.scala index 273376a..84f97f5 100644 --- a/src/main/scala/com/redis/RedisClientSettings.scala +++ b/src/main/scala/com/redis/RedisClientSettings.scala @@ -1,11 +1,12 @@ package com.redis -import RedisClientSettings._ +import java.lang.{Long => JLong} +import RedisClientSettings._ case class RedisClientSettings( backpressureBufferSettings: Option[BackpressureBufferSettings] = None, - reconnectionSettings: Option[ReconnectionSettings] = None + reconnectionSettings: ReconnectionSettings = NoReconnectionSettings ) object RedisClientSettings { @@ -24,32 +25,62 @@ object RedisClientSettings { def newSchedule: ReconnectionSchedule trait ReconnectionSchedule { + val maxAttempts: Long + var attempts = 0 + + /** + * Gets the number of milliseconds until the next reconnection attempt. + * + * This method is expected to increment attempts like an iterator + * + * @return milliseconds until the next attempt + */ def nextDelayMs: Long } } - case class ConstantReconnectionSettings(constantDelayMs: Long) extends ReconnectionSettings { + case object NoReconnectionSettings extends ReconnectionSettings{ + def newSchedule: ReconnectionSchedule = new ReconnectionSchedule { + val maxAttempts: Long = 0 + def nextDelayMs: Long = throw new NoSuchElementException("No delay available") + } + } + + case class ConstantReconnectionSettings(constantDelayMs: Long, maximumAttempts: Long = Long.MaxValue) extends ReconnectionSettings { require(constantDelayMs >= 0, s"Invalid negative reconnection delay (received $constantDelayMs)") + require(maximumAttempts >= 0, s"Invalid negative maximum attempts (received $maximumAttempts)") def newSchedule: ReconnectionSchedule = new ConstantSchedule class ConstantSchedule extends ReconnectionSchedule { - def nextDelayMs = constantDelayMs + val maxAttempts = maximumAttempts + def nextDelayMs = { + attempts += 1 + constantDelayMs + } } } - case class ExponentialReconnectionSettings(baseDelayMs: Long, maxDelayMs: Long) extends ReconnectionSettings { + case class ExponentialReconnectionSettings(baseDelayMs: Long, maxDelayMs: Long, maximumAttempts: Long = Long.MaxValue) extends ReconnectionSettings { require(baseDelayMs > 0, s"Base reconnection delay must be greater than 0. Received $baseDelayMs") require(maxDelayMs > 0, s"Maximum reconnection delay must be greater than 0. Received $maxDelayMs") require(maxDelayMs >= baseDelayMs, "Maximum reconnection delay cannot be smaller than base reconnection delay") def newSchedule = new ExponentialSchedule + private val ceil = if ((baseDelayMs & (baseDelayMs - 1)) == 0) 0 else 1 + private val attemptCeiling = JLong.SIZE - JLong.numberOfLeadingZeros(Long.MaxValue / baseDelayMs) - ceil + class ExponentialSchedule extends ReconnectionSchedule { - var attempts = 0 + val maxAttempts = maximumAttempts def nextDelayMs = { attempts += 1 - Math.min(baseDelayMs * (1L << attempts), maxDelayMs) + if (attempts > attemptCeiling) { + maxDelayMs + } else { + val factor = 1L << (attempts - 1) + Math.min(baseDelayMs * factor, maxDelayMs) + } } } } diff --git a/src/main/scala/com/redis/RedisConnection.scala b/src/main/scala/com/redis/RedisConnection.scala index 783f83b..5eab65e 100644 --- a/src/main/scala/com/redis/RedisConnection.scala +++ b/src/main/scala/com/redis/RedisConnection.scala @@ -46,17 +46,17 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis context watch pipe case CommandFailed(c: Connect) => - settings.reconnectionSettings match { - case Some(r) => - if (reconnectionSchedule.isEmpty) { - reconnectionSchedule = Some(settings.reconnectionSettings.get.newSchedule) - } - val delayMs = reconnectionSchedule.get.nextDelayMs - log.error("Connect failed for {} with {}. Reconnecting in {} ms... ", c.remoteAddress, c.failureMessage, delayMs) - context.system.scheduler.scheduleOnce(Duration(delayMs, TimeUnit.MILLISECONDS), IO(Tcp), Connect(remote))(context.dispatcher, self) - case None => - log.error("Connect failed for {} with {}. Stopping... ", c.remoteAddress, c.failureMessage) - context stop self + if (reconnectionSchedule.isEmpty) { + reconnectionSchedule = Some(settings.reconnectionSettings.newSchedule) + } + if (reconnectionSchedule.get.attempts < reconnectionSchedule.get.maxAttempts) { + val delayMs = reconnectionSchedule.get.nextDelayMs + log.error("Connect failed for {} with {}. Reconnecting in {} ms... ", c.remoteAddress, c.failureMessage, delayMs) + context become unconnected + context.system.scheduler.scheduleOnce(Duration(delayMs, TimeUnit.MILLISECONDS), IO(Tcp), Connect(remote))(context.dispatcher, self) + } else { + log.error("Connect failed for {} with {}. Stopping... ", c.remoteAddress, c.failureMessage) + context stop self } } @@ -123,18 +123,17 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis def withTerminationManagement(handler: Receive): Receive = handler orElse { case Terminated(x) => { - settings.reconnectionSettings match { - case Some(r) => - if (reconnectionSchedule.isEmpty) { - reconnectionSchedule = Some(settings.reconnectionSettings.get.newSchedule) - } - val delayMs = reconnectionSchedule.get.nextDelayMs - log.error("Child termination detected: {}. Reconnecting in {} ms... ", x, delayMs) - context become unconnected - context.system.scheduler.scheduleOnce(Duration(delayMs, TimeUnit.MILLISECONDS), IO(Tcp), Connect(remote))(context.dispatcher, self) - case None => - log.error("Child termination detected: {}", x) - context stop self + if (reconnectionSchedule.isEmpty) { + reconnectionSchedule = Some(settings.reconnectionSettings.newSchedule) + } + if (reconnectionSchedule.get.attempts < reconnectionSchedule.get.maxAttempts) { + val delayMs = reconnectionSchedule.get.nextDelayMs + log.error("Child termination detected: {}. Reconnecting in {} ms... ", x, delayMs) + context become unconnected + context.system.scheduler.scheduleOnce(Duration(delayMs, TimeUnit.MILLISECONDS), IO(Tcp), Connect(remote))(context.dispatcher, self) + } else { + log.error("Child termination detected: {}", x) + context stop self } } } From dffac618dccc78a64f6439180301e817e32f67be Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Sat, 19 Apr 2014 14:25:21 -0500 Subject: [PATCH 03/15] Refactor RedisSpecBase to use the Akka TestKit Reference: http://doc.akka.io/docs/akka/2.3.0/scala/testing.html --- src/test/scala/com/redis/RedisSpecBase.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/test/scala/com/redis/RedisSpecBase.scala b/src/test/scala/com/redis/RedisSpecBase.scala index 6271b18..b52b335 100644 --- a/src/test/scala/com/redis/RedisSpecBase.scala +++ b/src/test/scala/com/redis/RedisSpecBase.scala @@ -8,17 +8,17 @@ import com.redis.RedisClientSettings.ConstantReconnectionSettings import org.scalatest._ import org.scalatest.concurrent.{Futures, ScalaFutures} import org.scalatest.time._ +import akka.testkit.TestKit -trait RedisSpecBase extends FunSpec +class RedisSpecBase(_system: ActorSystem) extends TestKit(_system) + with FunSpecLike with Matchers with Futures with ScalaFutures with BeforeAndAfterEach with BeforeAndAfterAll { - import RedisSpecBase._ - // Akka setup - implicit val system = ActorSystem("redis-test-"+ iter.next) + def this() = this(ActorSystem("redis-test-"+ RedisSpecBase.iter.next)) implicit val executionContext = system.dispatcher implicit val timeout = Timeout(2 seconds) @@ -26,7 +26,12 @@ trait RedisSpecBase extends FunSpec implicit val defaultPatience = PatienceConfig(timeout = Span(5, Seconds), interval = Span(5, Millis)) // Redis client setup - val client = RedisClient("localhost", 6379, settings = RedisClientSettings(reconnectionSettings = Some(ConstantReconnectionSettings(1000)))) + val client = RedisClient("localhost", 6379) + + def withReconnectingClient(testCode: RedisClient => Any) = { + val client = RedisClient("localhost", 6379, settings = RedisClientSettings(reconnectionSettings = ConstantReconnectionSettings(1000))) + testCode(client) + } override def beforeEach = { client.flushdb() @@ -50,6 +55,6 @@ trait RedisSpecBase extends FunSpec object RedisSpecBase { - private val iter = Iterator from 0 + val iter = Iterator from 0 } From d9223e49a48c1429bddec9503a0e64e24ca8dbbf Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Sat, 19 Apr 2014 14:26:22 -0500 Subject: [PATCH 04/15] Update ClientSpec to test no reconnection and reconnection policies --- src/test/scala/com/redis/ClientSpec.scala | 38 +++++++++++++++-------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/test/scala/com/redis/ClientSpec.scala b/src/test/scala/com/redis/ClientSpec.scala index 76c7819..f6b3fd4 100644 --- a/src/test/scala/com/redis/ClientSpec.scala +++ b/src/test/scala/com/redis/ClientSpec.scala @@ -2,13 +2,12 @@ package com.redis import scala.concurrent.Future +import akka.testkit.TestProbe +import org.junit.runner.RunWith import org.scalatest.exceptions.TestFailedException import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - import serialization._ - @RunWith(classOf[JUnitRunner]) class ClientSpec extends RedisSpecBase { @@ -75,21 +74,34 @@ class ClientSpec extends RedisSpecBase { } describe("reconnections based on policy") { - it("should reconnect") { - val key = "reconnect_test" - - client.lpush(key, 0) - + it("should not reconnect by default") { + val probe = TestProbe() + probe watch client.clientRef // Extract our address // TODO Cleaner address extraction, perhaps in ServerOperations.client? val address = client.client.list().futureValue.get.toString.split(" ").head.split("=").last - client.client.kill(address).futureValue should be (true) + client.client.kill(address).futureValue should be(true) + probe.expectTerminated(client.clientRef) + } - client.lpush(key, 1 to 100).futureValue should equal (101) - val list = client.lrange[Long](key, 0, -1).futureValue + it("should reconnect with settings") { + withReconnectingClient { + client => + val key = "reconnect_test" - list.size should equal (101) - list.reverse should equal (0 to 100) + client.lpush(key, 0) + + // Extract our address + // TODO Cleaner address extraction, perhaps in ServerOperations.client? + val address = client.client.list().futureValue.get.toString.split(" ").head.split("=").last + client.client.kill(address).futureValue should be(true) + + client.lpush(key, 1 to 100).futureValue should equal(101) + val list = client.lrange[Long](key, 0, -1).futureValue + + list.size should equal(101) + list.reverse should equal(0 to 100) + } } } } From f9015d944446ff6f95238a74c3bcd05626d682f0 Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Sat, 19 Apr 2014 14:26:43 -0500 Subject: [PATCH 05/15] Organize imports --- src/test/scala/com/redis/RedisSpecBase.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/redis/RedisSpecBase.scala b/src/test/scala/com/redis/RedisSpecBase.scala index b52b335..b787dc7 100644 --- a/src/test/scala/com/redis/RedisSpecBase.scala +++ b/src/test/scala/com/redis/RedisSpecBase.scala @@ -2,13 +2,13 @@ package com.redis import scala.concurrent.duration._ -import akka.util.Timeout import akka.actor._ +import akka.testkit.TestKit +import akka.util.Timeout import com.redis.RedisClientSettings.ConstantReconnectionSettings import org.scalatest._ import org.scalatest.concurrent.{Futures, ScalaFutures} import org.scalatest.time._ -import akka.testkit.TestKit class RedisSpecBase(_system: ActorSystem) extends TestKit(_system) with FunSpecLike From e6eaaf60ede67ba4c1b0e0f5d4e712f9325fa267 Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Sun, 20 Apr 2014 12:09:47 -0500 Subject: [PATCH 06/15] Add akka-testkit to the dependencies --- project/ScalaRedisProject.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/project/ScalaRedisProject.scala b/project/ScalaRedisProject.scala index e69c13f..0543374 100644 --- a/project/ScalaRedisProject.scala +++ b/project/ScalaRedisProject.scala @@ -24,7 +24,8 @@ object ScalaRedisProject extends Build "org.slf4j" % "slf4j-api" % "1.7.6" % "provided", "ch.qos.logback" % "logback-classic" % "1.1.1" % "provided", "junit" % "junit" % "4.11" % "test", - "org.scalatest" %% "scalatest" % "2.1.0" % "test", + "org.scalatest" %% "scalatest" % "2.1.0" % "test", + "com.typesafe.akka" %% "akka-testkit" % "2.3.0" % "test", // Third-party serialization libraries "net.liftweb" %% "lift-json" % "2.5.1" % "provided, test", From deefa6879fd1737f288610393893a38c06a7adea Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Sun, 20 Apr 2014 12:35:58 -0500 Subject: [PATCH 07/15] Revert unconnected logic --- src/main/scala/com/redis/RedisConnection.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/redis/RedisConnection.scala b/src/main/scala/com/redis/RedisConnection.scala index 5eab65e..7a29efe 100644 --- a/src/main/scala/com/redis/RedisConnection.scala +++ b/src/main/scala/com/redis/RedisConnection.scala @@ -46,18 +46,8 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis context watch pipe case CommandFailed(c: Connect) => - if (reconnectionSchedule.isEmpty) { - reconnectionSchedule = Some(settings.reconnectionSettings.newSchedule) - } - if (reconnectionSchedule.get.attempts < reconnectionSchedule.get.maxAttempts) { - val delayMs = reconnectionSchedule.get.nextDelayMs - log.error("Connect failed for {} with {}. Reconnecting in {} ms... ", c.remoteAddress, c.failureMessage, delayMs) - context become unconnected - context.system.scheduler.scheduleOnce(Duration(delayMs, TimeUnit.MILLISECONDS), IO(Tcp), Connect(remote))(context.dispatcher, self) - } else { - log.error("Connect failed for {} with {}. Stopping... ", c.remoteAddress, c.failureMessage) - context stop self - } + log.error("Connect failed for {} with {}. Stopping... ", c.remoteAddress, c.failureMessage) + context stop self } def transactional(pipe: ActorRef): Receive = withTerminationManagement { From 2522043f205125c3e64ff9643cd7bd188e23e468 Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Sun, 20 Apr 2014 12:46:54 -0500 Subject: [PATCH 08/15] Reconnect more quickly during tests --- src/test/scala/com/redis/RedisSpecBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/redis/RedisSpecBase.scala b/src/test/scala/com/redis/RedisSpecBase.scala index b787dc7..bc8541d 100644 --- a/src/test/scala/com/redis/RedisSpecBase.scala +++ b/src/test/scala/com/redis/RedisSpecBase.scala @@ -29,7 +29,7 @@ class RedisSpecBase(_system: ActorSystem) extends TestKit(_system) val client = RedisClient("localhost", 6379) def withReconnectingClient(testCode: RedisClient => Any) = { - val client = RedisClient("localhost", 6379, settings = RedisClientSettings(reconnectionSettings = ConstantReconnectionSettings(1000))) + val client = RedisClient("localhost", 6379, settings = RedisClientSettings(reconnectionSettings = ConstantReconnectionSettings(100))) testCode(client) } From 039b040c581652c4aefac57bbeb6643ef3311d8d Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Sun, 20 Apr 2014 12:48:45 -0500 Subject: [PATCH 09/15] More reliable client termination. Now kills the most recent redis client --- src/test/scala/com/redis/ClientSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/scala/com/redis/ClientSpec.scala b/src/test/scala/com/redis/ClientSpec.scala index f6b3fd4..adfdbf1 100644 --- a/src/test/scala/com/redis/ClientSpec.scala +++ b/src/test/scala/com/redis/ClientSpec.scala @@ -77,9 +77,9 @@ class ClientSpec extends RedisSpecBase { it("should not reconnect by default") { val probe = TestProbe() probe watch client.clientRef - // Extract our address - // TODO Cleaner address extraction, perhaps in ServerOperations.client? - val address = client.client.list().futureValue.get.toString.split(" ").head.split("=").last + val clients = client.client.list().futureValue.get.split('\n') + // addr=127.0.0.1:65227 fd=9 name= age=0 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client + val address = clients.last.split(" ").head.split("=").last client.client.kill(address).futureValue should be(true) probe.expectTerminated(client.clientRef) } From d752e92f261487cf21fa2d7f4cd07044ec5ad902 Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Sun, 20 Apr 2014 12:51:21 -0500 Subject: [PATCH 10/15] Switch to lazy reconnection schedule --- src/main/scala/com/redis/RedisConnection.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/redis/RedisConnection.scala b/src/main/scala/com/redis/RedisConnection.scala index 7a29efe..847236f 100644 --- a/src/main/scala/com/redis/RedisConnection.scala +++ b/src/main/scala/com/redis/RedisConnection.scala @@ -25,7 +25,7 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis private[this] var pendingRequests = Queue.empty[RedisRequest] private[this] var txnRequests = Queue.empty[RedisRequest] - private[this] var reconnectionSchedule: Option[_ <: ReconnectionSettings#ReconnectionSchedule] = None + private[this] lazy val reconnectionSchedule = settings.reconnectionSettings.newSchedule IO(Tcp) ! Connect(remote) @@ -113,11 +113,8 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis def withTerminationManagement(handler: Receive): Receive = handler orElse { case Terminated(x) => { - if (reconnectionSchedule.isEmpty) { - reconnectionSchedule = Some(settings.reconnectionSettings.newSchedule) - } - if (reconnectionSchedule.get.attempts < reconnectionSchedule.get.maxAttempts) { - val delayMs = reconnectionSchedule.get.nextDelayMs + if (reconnectionSchedule.attempts < reconnectionSchedule.maxAttempts) { + val delayMs = reconnectionSchedule.nextDelayMs log.error("Child termination detected: {}. Reconnecting in {} ms... ", x, delayMs) context become unconnected context.system.scheduler.scheduleOnce(Duration(delayMs, TimeUnit.MILLISECONDS), IO(Tcp), Connect(remote))(context.dispatcher, self) From 76a71027af0212c8fa2090e12285408e9ee270fb Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Mon, 21 Apr 2014 16:26:03 -0500 Subject: [PATCH 11/15] Convert attempts to a long to match maxAttempts --- src/main/scala/com/redis/RedisClientSettings.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/redis/RedisClientSettings.scala b/src/main/scala/com/redis/RedisClientSettings.scala index 84f97f5..2f39f55 100644 --- a/src/main/scala/com/redis/RedisClientSettings.scala +++ b/src/main/scala/com/redis/RedisClientSettings.scala @@ -26,7 +26,7 @@ object RedisClientSettings { trait ReconnectionSchedule { val maxAttempts: Long - var attempts = 0 + var attempts: Long = 0 /** * Gets the number of milliseconds until the next reconnection attempt. From e9aab410b88ab941e7d70daeebc4311a04c95e51 Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Mon, 21 Apr 2014 16:26:30 -0500 Subject: [PATCH 12/15] Bugfix in CLIENT SETNAME --- src/main/scala/com/redis/protocol/ServerCommands.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/redis/protocol/ServerCommands.scala b/src/main/scala/com/redis/protocol/ServerCommands.scala index 093d865..0c0158e 100644 --- a/src/main/scala/com/redis/protocol/ServerCommands.scala +++ b/src/main/scala/com/redis/protocol/ServerCommands.scala @@ -49,7 +49,7 @@ object ServerCommands { } case class SetName(name: String) extends RedisCommand[Boolean]("CLIENT") { - def params = "SETNAME" +: ANil + def params = "SETNAME" +: name +: ANil } case class Kill(ipPort: String) extends RedisCommand[Boolean]("CLIENT") { From d1f8f587b1627328ef3370b45349db6f08d4cdb2 Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Mon, 21 Apr 2014 16:26:50 -0500 Subject: [PATCH 13/15] Close reconnecting clients after each use --- src/test/scala/com/redis/RedisSpecBase.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/scala/com/redis/RedisSpecBase.scala b/src/test/scala/com/redis/RedisSpecBase.scala index bc8541d..d987ecf 100644 --- a/src/test/scala/com/redis/RedisSpecBase.scala +++ b/src/test/scala/com/redis/RedisSpecBase.scala @@ -31,6 +31,7 @@ class RedisSpecBase(_system: ActorSystem) extends TestKit(_system) def withReconnectingClient(testCode: RedisClient => Any) = { val client = RedisClient("localhost", 6379, settings = RedisClientSettings(reconnectionSettings = ConstantReconnectionSettings(100))) testCode(client) + client.quit().futureValue should equal (true) } override def beforeEach = { From 5db9a5269922ffabce67a18d576524e6b7c24e36 Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Mon, 21 Apr 2014 16:27:56 -0500 Subject: [PATCH 14/15] Added a helper method to kill named clients, fixing test reliability --- src/test/scala/com/redis/ClientSpec.scala | 54 ++++++++++++++++------- 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/src/test/scala/com/redis/ClientSpec.scala b/src/test/scala/com/redis/ClientSpec.scala index adfdbf1..b5e353a 100644 --- a/src/test/scala/com/redis/ClientSpec.scala +++ b/src/test/scala/com/redis/ClientSpec.scala @@ -7,6 +7,11 @@ import org.junit.runner.RunWith import org.scalatest.exceptions.TestFailedException import org.scalatest.junit.JUnitRunner import serialization._ +import akka.io.Tcp.{Connected, CommandFailed} +import scala.reflect.ClassTag +import scala.concurrent.duration._ +import com.redis.RedisClientSettings.ConstantReconnectionSettings +import com.redis.protocol.ServerCommands.Client.Kill @RunWith(classOf[JUnitRunner]) class ClientSpec extends RedisSpecBase { @@ -74,33 +79,50 @@ class ClientSpec extends RedisSpecBase { } describe("reconnections based on policy") { + + def killClientsNamed(rc: RedisClient, name: String): Future[List[Boolean]] = { + // Clients are a list of lines similar to + // addr=127.0.0.1:65227 fd=9 name= age=0 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client + // We'll split them up and make a map + val clients = rc.client.list().futureValue.get.toString + .split('\n') + .map(_.trim) + .filterNot(_.isEmpty) + .map( + _.split(" ").map( + _.split("=").padTo(2, "") + ).map( + item => (item(0), item(1)) + ) + ).map(_.toMap) + Future.sequence(clients.filter(_("name") == name).map(_("addr")).map(rc.client.kill).toList) + } + it("should not reconnect by default") { + val name = "test-client-1" + client.client.setname(name).futureValue should equal (true) + val probe = TestProbe() probe watch client.clientRef - val clients = client.client.list().futureValue.get.split('\n') - // addr=127.0.0.1:65227 fd=9 name= age=0 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client - val address = clients.last.split(" ").head.split("=").last - client.client.kill(address).futureValue should be(true) + killClientsNamed(client, name).futureValue.reduce(_ && _) should equal (true) probe.expectTerminated(client.clientRef) } it("should reconnect with settings") { - withReconnectingClient { - client => - val key = "reconnect_test" + withReconnectingClient { client => + val name = "test-client-2" + client.client.setname(name).futureValue should equal (true) - client.lpush(key, 0) + val key = "reconnect_test" + client.lpush(key, 0) - // Extract our address - // TODO Cleaner address extraction, perhaps in ServerOperations.client? - val address = client.client.list().futureValue.get.toString.split(" ").head.split("=").last - client.client.kill(address).futureValue should be(true) + killClientsNamed(client, name).futureValue.reduce(_ && _) should equal (true) - client.lpush(key, 1 to 100).futureValue should equal(101) - val list = client.lrange[Long](key, 0, -1).futureValue + client.lpush(key, 1 to 100).futureValue should equal(101) + val list = client.lrange[Long](key, 0, -1).futureValue - list.size should equal(101) - list.reverse should equal(0 to 100) + list.size should equal(101) + list.reverse should equal(0 to 100) } } } From 963c025f70e5c510626d28e53c87d314083cca93 Mon Sep 17 00:00:00 2001 From: Logan Lowell Date: Tue, 22 Apr 2014 13:31:09 -0500 Subject: [PATCH 15/15] Ooops, leave that iter private --- src/test/scala/com/redis/RedisSpecBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/redis/RedisSpecBase.scala b/src/test/scala/com/redis/RedisSpecBase.scala index d987ecf..7f38c97 100644 --- a/src/test/scala/com/redis/RedisSpecBase.scala +++ b/src/test/scala/com/redis/RedisSpecBase.scala @@ -56,6 +56,6 @@ class RedisSpecBase(_system: ActorSystem) extends TestKit(_system) object RedisSpecBase { - val iter = Iterator from 0 + private val iter = Iterator from 0 }