Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2dcd409
Better behaviour in the presence of backing off
j-baker Aug 2, 2018
c583b33
Merge branch 'develop' into jbaker/better_429_behaviour
j-baker Aug 2, 2018
d51d3ac
more docs, easier to read
j-baker Aug 2, 2018
19a8173
Better concurrency limiters
j-baker Aug 3, 2018
d7164de
fixes
j-baker Aug 3, 2018
39388b7
simplify
j-baker Aug 3, 2018
f2927d0
Checkstyle
j-baker Aug 13, 2018
8f57de6
checkstyle
j-baker Aug 13, 2018
8f02b8b
PR comments
j-baker Sep 2, 2018
fbbcc41
Tweak the concurrency limiters lib
j-baker Sep 2, 2018
27a2153
reset flow control test
j-baker Sep 2, 2018
e548996
changes
j-baker Sep 2, 2018
a4c9e68
Changes
j-baker Sep 2, 2018
c68bc89
Passes the build
j-baker Sep 2, 2018
91dd6d2
update lockfiles
j-baker Sep 2, 2018
2229a81
Merge remote-tracking branch 'origin/develop' into jbaker/better_429_…
j-baker Sep 11, 2018
ee8e539
New attempt using interceptor
j-baker Sep 11, 2018
036d45b
more comments
j-baker Sep 11, 2018
1e18435
some bullshit
j-baker Sep 12, 2018
fbdeab1
Perfect
j-baker Sep 12, 2018
951dfdd
cleanup
j-baker Sep 12, 2018
ed18c48
Ready to go?
j-baker Sep 13, 2018
fda4d64
docs
j-baker Sep 13, 2018
8a088d7
Checkstyle
j-baker Sep 13, 2018
a9721e4
chekcstyle
j-baker Sep 13, 2018
1ce7b82
Metric
j-baker Sep 13, 2018
bc38b76
Javadoc
iamdanfox Sep 13, 2018
addbdca
README describes new flow control
iamdanfox Sep 13, 2018
c97925a
Move docs -> class level javadoc
iamdanfox Sep 13, 2018
baaa142
Rename ConcurrencyLimiters#limiter -> acquireLimiter
iamdanfox Sep 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Better concurrency limiters
  • Loading branch information
j-baker committed Aug 3, 2018
commit 19a817309c02d6b6eaf0bcdc7e35f4b3cad7da29
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,28 @@
import org.slf4j.LoggerFactory;

/**
* Remoting calls may observe 429 or 503 responses from the server, at which point they back off in order to
* reduce excess load. Unfortunately this state on backing off is stored per-call, so 429s or 503s in one call do not
* cause any request rate slowdown in subsequent calls. This class affects this by adjusting the number of requests
* that might be dispatched to a given endpoint.
* Flow control in Conjure is a collaborative effort between servers and clients. Servers advertise an overloaded state
* via 429/503 responses, and clients throttle the number of requests that they send concurrently as a response to this.
* The latter is implemented as a combination of two techniques, yielding a mechanism similar to flow control in TCP/IP.
* <ol>
* <li>
* Clients use the frequency of 429/503 responses (as well as the request latency) to determine an estimate
* for the number of permissible concurrent requests
* </li>
* <li>
* Each such request gets scheduled according to an exponential backoff algorithm.
* </li>
* </ol>
* <p>
* This is based on Netflix's <a href="https://github.com/Netflix/concurrency-limits/">Concurrency Limits</a> library,
* which provides a number of primitives for this.
* This class provides an asynchronous implementation of Netflix's
* <a href="https://github.com/Netflix/concurrency-limits/">concurrency-limits</a> library for determining the
* above mentioned concurrency estimates.
* <p>
* In order to use this class, one should get a Limiter for their request, which returns a future. once the Future is
* done, the caller can assume that the request is schedulable. After the request completes, the caller <b>must</b>
* call one of the methods on {@link Limiter.Listener} in order to provide feedback about the request's success.
* If this is not done, throughput will be negatively affected. We attempt to eventually recover to avoid a total
* deadlock, but this is not guaranteed.
* In order to use this class, one should acquire a Limiter for their request, which returns a future. once the Future
* is completed, the caller can assume that the request is schedulable. After the request completes, the caller
* <b>must</b> call one of the methods on {@link Limiter.Listener} in order to provide feedback about the request's
* success. If this is not done, throughput will be negatively affected. We attempt to eventually recover to avoid a
* total deadlock, but this is not guaranteed.
*/
final class ConcurrencyLimiters {
private static final Logger log = LoggerFactory.getLogger(ConcurrencyLimiters.class);
Expand All @@ -72,15 +81,11 @@ final class ConcurrencyLimiters {

private final ConcurrentMap<String, ConcurrencyLimiter> limiters = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this a size limited cache?


private static Limiter<Void> newLimiter() {
return DefaultLimiter.newBuilder()
.limit(TracingLimitDecorator.wrap(AIMDLimit.newBuilder().initialLimit(1).build()))
.build(new SimpleStrategy<>());
}

@VisibleForTesting
ConcurrencyLimiter limiter(String name) {
return limiters.computeIfAbsent(name, key -> new ConcurrencyLimiter(activeListeners, newLimiter()));
return limiters.computeIfAbsent(name, key -> new ConcurrencyLimiter(DefaultLimiter.newBuilder()
.limit(TracingLimitDecorator.wrap(AIMDLimit.newBuilder().initialLimit(1).build()))
.build(new SimpleStrategy<>())));
}

ConcurrencyLimiter limiter(Request request) {
Expand All @@ -105,14 +110,10 @@ ConcurrencyLimiter limiter(Request request) {
* some more.
*/
static final class ConcurrencyLimiter {
private final Map<Limiter.Listener, Runnable> activeListeners;
private final Queue<SettableFuture<Limiter.Listener>> waitingRequests = new LinkedBlockingQueue<>();
private final Limiter<Void> limiter;

public ConcurrencyLimiter(
Map<Limiter.Listener, Runnable> activeListeners,
Limiter<Void> limiter) {
this.activeListeners = activeListeners;
public ConcurrencyLimiter(Limiter<Void> limiter) {
this.limiter = limiter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ public void onSuccess(Listener listener) {
public void onFailure(Throwable t) {
callback.onFailure(
RemotingOkHttpCall.this,
new IOException(new AssertionError("This should never happen", t)));
new IOException(new AssertionError("This should never happen, since it implies "
+ "we failed when using the concurrency limiter", t)));
}
}, MoreExecutors.directExecutor());
}
Expand Down