From 185ed5278cfebbded5ef40606f14fd079549862a Mon Sep 17 00:00:00 2001 From: okumin Date: Wed, 17 Sep 2014 01:55:03 +0900 Subject: [PATCH 1/2] Use Queue API instead of inefficient operations In Scala 2.10.x, Queue#:+ has a performance issue. https://github.com/scala/scala/commit/f0f0a5e7813501d985174d3c5573c34c8a7608c6 So this commit replaces that operations with Queue#enqueue. And use Queue#dequeue explicitly instead of Queue#head and Queue#tail. There is little dirrerence between these operations, but it will be obvious that Queue is used correctly. --- src/main/scala/com/redis/RedisConnection.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/redis/RedisConnection.scala b/src/main/scala/com/redis/RedisConnection.scala index a2d7723..ef8ee42 100644 --- a/src/main/scala/com/redis/RedisConnection.scala +++ b/src/main/scala/com/redis/RedisConnection.scala @@ -14,7 +14,6 @@ import scala.language.existentials import akka.actor._ import akka.io.{BackpressureBuffer, IO, Tcp, TcpPipelineHandler} -import com.redis.RedisClientSettings.ReconnectionSettings import pipeline._ import protocol._ @@ -71,7 +70,7 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis context become (running(pipe)) case TransactionCommands.Discard => - txnRequests = txnRequests.drop(txnRequests.size) + txnRequests = Queue.empty sendRequest(pipe, RedisRequest(sender, TransactionCommands.Discard)) context become (running(pipe)) @@ -161,10 +160,10 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis } def addPendingRequest(cmd: RedisCommand[_]): Unit = - pendingRequests :+= RedisRequest(sender, cmd) + pendingRequests = pendingRequests.enqueue(RedisRequest(sender(), cmd)) def addTxnRequest(cmd: RedisCommand[_]): Unit = - txnRequests :+= RedisRequest(sender, cmd) + txnRequests = txnRequests.enqueue(RedisRequest(sender(), cmd)) def sendRequest(pipe: ActorRef, req: RedisRequest): Unit = { pipe ! init.Command(req) @@ -173,17 +172,18 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis @tailrec final def sendAllPendingRequests(pipe: ActorRef): Unit = if (pendingRequests.nonEmpty) { - val request = pendingRequests.head + val (request, tail) = pendingRequests.dequeue self.tell( request.command, request.sender ) - pendingRequests = pendingRequests.tail + pendingRequests = tail sendAllPendingRequests(pipe) } @tailrec final def sendAllTxnRequests(pipe: ActorRef): Unit = if (txnRequests.nonEmpty) { - sendRequest(pipe, txnRequests.head) - txnRequests = txnRequests.tail + val (request, tail) = txnRequests.dequeue + sendRequest(pipe, request) + txnRequests = tail sendAllTxnRequests(pipe) } From 70761cccb68b8aeb976c7426bdceb5789efaf473 Mon Sep 17 00:00:00 2001 From: okumin Date: Wed, 24 Sep 2014 02:37:11 +0900 Subject: [PATCH 2/2] Use Queue API in ResponseHandling --- .../com/redis/pipeline/ResponseHandling.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/main/scala/com/redis/pipeline/ResponseHandling.scala b/src/main/scala/com/redis/pipeline/ResponseHandling.scala index 4185634..593e02a 100644 --- a/src/main/scala/com/redis/pipeline/ResponseHandling.scala +++ b/src/main/scala/com/redis/pipeline/ResponseHandling.scala @@ -55,18 +55,18 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman @tailrec def parseExecResponse(data: CompactByteString, acc: Iterable[Any]): Iterable[Any] = { - if (txnRequests isEmpty) acc + if (txnRequests.isEmpty) acc else { + val (RedisRequest(commander, command), tail) = txnRequests.dequeue // process every response with the appropriate de-serializer that we have accumulated in txnRequests - parser.parse(data, txnRequests.head.command.des) match { + parser.parse(data, command.des) match { case Result.Ok(reply, remaining) => val result = reply match { case err: RedisError => Failure(err) case _ => reply } - val RedisRequest(commander, cmd) = txnRequests.head commander.tell(result, redisClientRef) - txnRequests = txnRequests.tail + txnRequests = tail parseExecResponse(remaining, acc ++ List(result)) case Result.NeedMoreData => @@ -85,7 +85,7 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman def parseAndDispatch(data: CompactByteString): Iterable[Result] = if (sentRequests.isEmpty) ctx.singleEvent(RequestQueueEmpty) else { - val RedisRequest(commander, cmd) = sentRequests.head + val (RedisRequest(commander, cmd), tail) = sentRequests.dequeue // we have an Exec : need to parse the response which will be a collection of // MultiBulk and then end transaction mode @@ -94,7 +94,7 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman else { val result = if (Deserializer.nullMultiBulk(data)) { - txnRequests = txnRequests.drop(txnRequests.size) + txnRequests = Queue.empty Failure(Deserializer.EmptyTxnResultException) } else parseExecResponse(data.splitAt(data.indexOf(Lf) + 1)._2.compact, List.empty[Result]) @@ -110,7 +110,7 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman } log.debug("RESULT: {}", result) if (reply != Queued) commander.tell(result, redisClientRef) - sentRequests = sentRequests.tail + sentRequests = tail parseAndDispatch(remaining) case Result.NeedMoreData => ctx.singleEvent(RequestQueueEmpty) @@ -142,11 +142,12 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman // queue up all commands between multi & exec // this is an optimization that sends commands in bulk for transaction mode - if (txnMode && req.command != TransactionCommands.Multi && req.command != TransactionCommands.Exec) - txnRequests :+= req + if (txnMode && req.command != TransactionCommands.Multi && req.command != TransactionCommands.Exec) { + txnRequests = txnRequests.enqueue(req) + } log.debug("Sending {}, previous head: {}", req.command, sentRequests.headOption.map(_.command)) - sentRequests :+= req + sentRequests = sentRequests.enqueue(req) ctx singleCommand req case _ => ctx singleCommand cmd