Skip to content

Commit c9c3935

Browse files
committed
Use Queue API in ResponseHandling
1 parent 185ed52 commit c9c3935

File tree

1 file changed

+11
-9
lines changed

1 file changed

+11
-9
lines changed

src/main/scala/com/redis/pipeline/ResponseHandling.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,19 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman
5555

5656
@tailrec
5757
def parseExecResponse(data: CompactByteString, acc: Iterable[Any]): Iterable[Any] = {
58-
if (txnRequests isEmpty) acc
58+
if (txnRequests.isEmpty) acc
5959
else {
60+
val (request, tail) = txnRequests.dequeue
6061
// process every response with the appropriate de-serializer that we have accumulated in txnRequests
61-
parser.parse(data, txnRequests.head.command.des) match {
62+
parser.parse(data, request.command.des) match {
6263
case Result.Ok(reply, remaining) =>
6364
val result = reply match {
6465
case err: RedisError => Failure(err)
6566
case _ => reply
6667
}
6768
val RedisRequest(commander, cmd) = txnRequests.head
6869
commander.tell(result, redisClientRef)
69-
txnRequests = txnRequests.tail
70+
txnRequests = tail
7071
parseExecResponse(remaining, acc ++ List(result))
7172

7273
case Result.NeedMoreData =>
@@ -85,7 +86,7 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman
8586
def parseAndDispatch(data: CompactByteString): Iterable[Result] =
8687
if (sentRequests.isEmpty) ctx.singleEvent(RequestQueueEmpty)
8788
else {
88-
val RedisRequest(commander, cmd) = sentRequests.head
89+
val (RedisRequest(commander, cmd), tail) = sentRequests.dequeue
8990

9091
// we have an Exec : need to parse the response which will be a collection of
9192
// MultiBulk and then end transaction mode
@@ -94,7 +95,7 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman
9495
else {
9596
val result =
9697
if (Deserializer.nullMultiBulk(data)) {
97-
txnRequests = txnRequests.drop(txnRequests.size)
98+
txnRequests = Queue.empty
9899
Failure(Deserializer.EmptyTxnResultException)
99100
} else parseExecResponse(data.splitAt(data.indexOf(Lf) + 1)._2.compact, List.empty[Result])
100101

@@ -110,7 +111,7 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman
110111
}
111112
log.debug("RESULT: {}", result)
112113
if (reply != Queued) commander.tell(result, redisClientRef)
113-
sentRequests = sentRequests.tail
114+
sentRequests = tail
114115
parseAndDispatch(remaining)
115116

116117
case Result.NeedMoreData => ctx.singleEvent(RequestQueueEmpty)
@@ -142,11 +143,12 @@ class ResponseHandling extends PipelineStage[WithinActorContext, Command, Comman
142143

143144
// queue up all commands between multi & exec
144145
// this is an optimization that sends commands in bulk for transaction mode
145-
if (txnMode && req.command != TransactionCommands.Multi && req.command != TransactionCommands.Exec)
146-
txnRequests :+= req
146+
if (txnMode && req.command != TransactionCommands.Multi && req.command != TransactionCommands.Exec) {
147+
txnRequests = txnRequests.enqueue(req)
148+
}
147149

148150
log.debug("Sending {}, previous head: {}", req.command, sentRequests.headOption.map(_.command))
149-
sentRequests :+= req
151+
sentRequests = sentRequests.enqueue(req)
150152
ctx singleCommand req
151153

152154
case _ => ctx singleCommand cmd

0 commit comments

Comments
 (0)