Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/main/scala/com/redis/RedisConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import scala.language.existentials

import akka.actor._
import akka.io.{BackpressureBuffer, IO, Tcp, TcpPipelineHandler}
import com.redis.RedisClientSettings.ReconnectionSettings
import pipeline._
import protocol._

Expand Down Expand Up @@ -71,7 +70,7 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis
context become (running(pipe))

case TransactionCommands.Discard =>
txnRequests = txnRequests.drop(txnRequests.size)
txnRequests = Queue.empty
sendRequest(pipe, RedisRequest(sender, TransactionCommands.Discard))
context become (running(pipe))

Expand Down Expand Up @@ -161,10 +160,10 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis
}

def addPendingRequest(cmd: RedisCommand[_]): Unit =
pendingRequests :+= RedisRequest(sender, cmd)
pendingRequests = pendingRequests.enqueue(RedisRequest(sender(), cmd))

def addTxnRequest(cmd: RedisCommand[_]): Unit =
txnRequests :+= RedisRequest(sender, cmd)
txnRequests = txnRequests.enqueue(RedisRequest(sender(), cmd))

def sendRequest(pipe: ActorRef, req: RedisRequest): Unit = {
pipe ! init.Command(req)
Expand All @@ -173,17 +172,18 @@ private [redis] class RedisConnection(remote: InetSocketAddress, settings: Redis
@tailrec
final def sendAllPendingRequests(pipe: ActorRef): Unit =
if (pendingRequests.nonEmpty) {
val request = pendingRequests.head
val (request, tail) = pendingRequests.dequeue
self.tell( request.command, request.sender )
pendingRequests = pendingRequests.tail
pendingRequests = tail
sendAllPendingRequests(pipe)
}

@tailrec
final def sendAllTxnRequests(pipe: ActorRef): Unit =
if (txnRequests.nonEmpty) {
sendRequest(pipe, txnRequests.head)
txnRequests = txnRequests.tail
val (request, tail) = txnRequests.dequeue
sendRequest(pipe, request)
txnRequests = tail
sendAllTxnRequests(pipe)
}

Expand Down
21 changes: 11 additions & 10 deletions src/main/scala/com/redis/pipeline/ResponseHandling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman

@tailrec
def parseExecResponse(data: CompactByteString, acc: Iterable[Any]): Iterable[Any] = {
if (txnRequests isEmpty) acc
if (txnRequests.isEmpty) acc
else {
val (RedisRequest(commander, command), tail) = txnRequests.dequeue
// process every response with the appropriate de-serializer that we have accumulated in txnRequests
parser.parse(data, txnRequests.head.command.des) match {
parser.parse(data, command.des) match {
case Result.Ok(reply, remaining) =>
val result = reply match {
case err: RedisError => Failure(err)
case _ => reply
}
val RedisRequest(commander, cmd) = txnRequests.head
commander.tell(result, redisClientRef)
txnRequests = txnRequests.tail
txnRequests = tail
parseExecResponse(remaining, acc ++ List(result))

case Result.NeedMoreData =>
Expand All @@ -85,7 +85,7 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman
def parseAndDispatch(data: CompactByteString): Iterable[Result] =
if (sentRequests.isEmpty) ctx.singleEvent(RequestQueueEmpty)
else {
val RedisRequest(commander, cmd) = sentRequests.head
val (RedisRequest(commander, cmd), tail) = sentRequests.dequeue

// we have an Exec : need to parse the response which will be a collection of
// MultiBulk and then end transaction mode
Expand All @@ -94,7 +94,7 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman
else {
val result =
if (Deserializer.nullMultiBulk(data)) {
txnRequests = txnRequests.drop(txnRequests.size)
txnRequests = Queue.empty
Failure(Deserializer.EmptyTxnResultException)
} else parseExecResponse(data.splitAt(data.indexOf(Lf) + 1)._2.compact, List.empty[Result])

Expand All @@ -110,7 +110,7 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman
}
log.debug("RESULT: {}", result)
if (reply != Queued) commander.tell(result, redisClientRef)
sentRequests = sentRequests.tail
sentRequests = tail
parseAndDispatch(remaining)

case Result.NeedMoreData => ctx.singleEvent(RequestQueueEmpty)
Expand Down Expand Up @@ -142,11 +142,12 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman

// queue up all commands between multi & exec
// this is an optimization that sends commands in bulk for transaction mode
if (txnMode && req.command != TransactionCommands.Multi && req.command != TransactionCommands.Exec)
txnRequests :+= req
if (txnMode && req.command != TransactionCommands.Multi && req.command != TransactionCommands.Exec) {
txnRequests = txnRequests.enqueue(req)
}

log.debug("Sending {}, previous head: {}", req.command, sentRequests.headOption.map(_.command))
sentRequests :+= req
sentRequests = sentRequests.enqueue(req)
ctx singleCommand req

case _ => ctx singleCommand cmd
Expand Down