Skip to content

Conversation

@knutwalker
Copy link

Some note about this implementation:

  • this is based on Support new CLIENT KILL semantics from Redis 2.8.12+ in tests #89
  • Reactive Streams handle back pressure by requesting demand if needed. The BackpressureBufferSettings are ignored, but it may be possible to somehow integrate these with the ActorProducer
  • Each client<->server connection is represented by a Flow
  • The pipe Actor no longer is the underlying connection pipeline but the producer for the connection Flow
  • termination management can be removed, since the connection Flow completes upon disconnection. Reacting to the Terminated message is done in unhandled
  • The tracking of the requests in flight is done by a Flow manipulator (RequestResponseHandler). This is one instance per Flow and is therefore shared amongst different threads/dispatchers. Concurrent manipulation is now backed by java.util.concurrent.* stuff. (Indeed, using the s.c.i.Queue randomly fails tests.)
  • I moved case classes and objects from the package object to a types.scala file. Using class likes in package objects in dispreferred by -Xlint and gave sometimes weird compiler issues
  • I changed the parsing of Boolean results.
    • Before, it would accept any String response as true, now it's only the Status string (OK). Redis sends other strings like +QUEUED that would have been treated as true.
    • It peeks at the result and pushes back the peeked bytes. I don't really like that solution, but i didn't wanted to change too much with the reply parsing.
    • This is also the part, where I'm not quite sure, how it could have worked before :)
  • Occasionally, Akka throws an Exception and complains about a terminated Processor. This does not seem to have any influence on the result, tests are green regardless. I'm also not sure, why the Exception is sometimes thrown and sometimes not. The Stacktrace gives no hints about own code, but it looks like this happens during Actor shutdown. I assume it has something to do with the fact, that akka-streams is still experimental, so maybe this is a bug in Akka.
01:43:51.717 [redis-test-9-akka.actor.default-dispatcher-14] ERROR a.stream.impl.TransformProcessorImpl - failure during processing
java.lang.IllegalStateException: Processor actor terminated abruptly
    at akka.stream.impl.ActorProcessorImpl.postStop(ActorProcessor.scala:287) ~[akka-stream-experimental_2.11-0.4.jar:na]
    at akka.stream.impl.TransformProcessorImpl.postStop(SingleStreamProcessors.scala:82) ~[akka-stream-experimental_2.11-0.4.jar:na]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) ~[akka-actor_2.11-2.3.4.jar:na]
    at akka.stream.impl.ActorProcessorImpl.aroundPostStop(ActorProcessor.scala:238) ~[akka-stream-experimental_2.11-0.4.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.3.4.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.3.4.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:369) ~[akka-actor_2.11-2.3.4.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) ~[akka-actor_2.11-2.3.4.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.11-2.3.4.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.11-2.3.4.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.11-2.3.4.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_20-ea]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_20-ea]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20-ea]

Looking forward to your Review :)

CLIENT KILL since version 2.8.12 has a new format:
http://redis.io/commands/client-kill

The interesting part is `CLIENT KILL SKIPME yes/no`.
By default this option is set to yes, which means that something in the sense of client.kill(self) wouldn't work anymore.

This commit changes the test to use a different client for killing the desired connection, if the redis version is greater than 2.8.12
implements debasishg#66

Reactive streams are about automatically managing back pressure,
so the current BP options are ignored (but not removed).
No client or test changes required
@debasishg
Copy link
Owner

Thanks for the PR. I will take a look over the weekend. @guersam - mind taking a look when u have some free time ?

@guersam
Copy link
Collaborator

guersam commented Jul 16, 2014

Sure, will investigate the processor error as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closes over Actor and ActorContext. Use pipeTo.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, thanks!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commanderAndResult match {
  case (commander, Success(res)) => commander ! res
  case (commander, Failure(e)) => commander ! Status.Failure(e)
}

saves a bit

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has some predefined byte constants. How about moving it there for future reuse?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I moved it in 89985de.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :)

@guersam
Copy link
Collaborator

guersam commented Jul 28, 2014

Honestly I'm not familiar with akka-streams yet, however I think we can merge it because it tests all tests after all.

@debasishg/ For Scala 2.11 support, how about releasing current master before merging it? As it brings significant changes to the underlying codebase, it might be better to wait for a few more akka-streams updates before marking it as stable.

@debasishg
Copy link
Owner

@guersam +1 on your thoughts. I am also not much familiar with Akka streams yet. I will release the current master and then wait for akka streams to stabilize more before we do a release of a version based on akka streams.

@guersam
Copy link
Collaborator

guersam commented Jul 28, 2014

And then we also might be able to steal some patterns from the upcoming http client :P

@debasishg
Copy link
Owner

"upcoming http client" .. link ? Looks like I am not up to date w/ this development. Sure we can. But it's also great to have a baseline version, thanks to @knutwalker ..

@guersam
Copy link
Collaborator

guersam commented Jul 28, 2014

Of course, thanks a lot, @knutwalker!
FYI, currently akka-http-core is under release-2.3-dev branch. It might be better for us as well to keep a separate branch for one of akka-streams or current 0.6.x release, so that we can add some command or do some urgent patches easily.

@knutwalker
Copy link
Author

I can reopen this PR against any other branch, if you like. I'd say too, it's better to not push this in master right away. And I'm also interested in examining akka-http, there already seems to be slight difference. It looks like they're using a new FlowMaterializer per connection, which makes sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants