Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2dcd409
Better behaviour in the presence of backing off
j-baker Aug 2, 2018
c583b33
Merge branch 'develop' into jbaker/better_429_behaviour
j-baker Aug 2, 2018
d51d3ac
more docs, easier to read
j-baker Aug 2, 2018
19a8173
Better concurrency limiters
j-baker Aug 3, 2018
d7164de
fixes
j-baker Aug 3, 2018
39388b7
simplify
j-baker Aug 3, 2018
f2927d0
Checkstyle
j-baker Aug 13, 2018
8f57de6
checkstyle
j-baker Aug 13, 2018
8f02b8b
PR comments
j-baker Sep 2, 2018
fbbcc41
Tweak the concurrency limiters lib
j-baker Sep 2, 2018
27a2153
reset flow control test
j-baker Sep 2, 2018
e548996
changes
j-baker Sep 2, 2018
a4c9e68
Changes
j-baker Sep 2, 2018
c68bc89
Passes the build
j-baker Sep 2, 2018
91dd6d2
update lockfiles
j-baker Sep 2, 2018
2229a81
Merge remote-tracking branch 'origin/develop' into jbaker/better_429_…
j-baker Sep 11, 2018
ee8e539
New attempt using interceptor
j-baker Sep 11, 2018
036d45b
more comments
j-baker Sep 11, 2018
1e18435
some bullshit
j-baker Sep 12, 2018
fbdeab1
Perfect
j-baker Sep 12, 2018
951dfdd
cleanup
j-baker Sep 12, 2018
ed18c48
Ready to go?
j-baker Sep 13, 2018
fda4d64
docs
j-baker Sep 13, 2018
8a088d7
Checkstyle
j-baker Sep 13, 2018
a9721e4
chekcstyle
j-baker Sep 13, 2018
1ce7b82
Metric
j-baker Sep 13, 2018
bc38b76
Javadoc
iamdanfox Sep 13, 2018
addbdca
README describes new flow control
iamdanfox Sep 13, 2018
c97925a
Move docs -> class level javadoc
iamdanfox Sep 13, 2018
baaa142
Rename ConcurrencyLimiters#limiter -> acquireLimiter
iamdanfox Sep 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
some bullshit
  • Loading branch information
j-baker committed Sep 12, 2018
commit 1e18435422058e54b493a252f53eccd79fe39463
4 changes: 2 additions & 2 deletions jaxrs-clients/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
]
},
"com.netflix.concurrency-limits:concurrency-limits-core": {
"locked": "0.0.48",
"locked": "0.1.1",
"transitive": [
"com.palantir.remoting3:okhttp-clients"
]
Expand Down Expand Up @@ -391,7 +391,7 @@
]
},
"com.netflix.concurrency-limits:concurrency-limits-core": {
"locked": "0.0.48",
"locked": "0.1.1",
"transitive": [
"com.palantir.remoting3:okhttp-clients"
]
Expand Down
4 changes: 2 additions & 2 deletions jaxrs-scala-clients/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
]
},
"com.netflix.concurrency-limits:concurrency-limits-core": {
"locked": "0.0.48",
"locked": "0.1.1",
"transitive": [
"com.palantir.remoting3:okhttp-clients"
]
Expand Down Expand Up @@ -468,7 +468,7 @@
]
},
"com.netflix.concurrency-limits:concurrency-limits-core": {
"locked": "0.0.48",
"locked": "0.1.1",
"transitive": [
"com.palantir.remoting3:okhttp-clients"
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2018 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.palantir.remoting3.okhttp;

import com.netflix.concurrency.limits.Limiter;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;

/**
* {@link Limiter} that blocks the caller when the limit has been reached. The caller is
* blocked until the limiter has been released. This limiter is commonly used in batch
* clients that use the limiter as a back-pressure mechanism.
*
* @param <ContextT>
*/
public final class BlockingLimiter<ContextT> implements Limiter<ContextT> {
public static <ContextT> BlockingLimiter<ContextT> wrap(Limiter<ContextT> delegate) {
return new BlockingLimiter<>(Clock.systemUTC(), delegate, Optional.empty());
}

public static <ContextT> BlockingLimiter<ContextT> wrap(Limiter<ContextT> delegate, Duration timeout) {
return new BlockingLimiter<>(Clock.systemUTC(), delegate, Optional.of(timeout));
}

private final Limiter<ContextT> delegate;
private final Clock clock;
private final Optional<Duration> timeout;

/**
* Lock used to block and unblock callers as the limit is reached
*/
private final Object lock = new Object();

private BlockingLimiter(Clock clock, Limiter<ContextT> limiter, Optional<Duration> timeout) {
this.clock = clock;
this.delegate = limiter;
this.timeout = timeout;
}

private Optional<Listener> tryAcquire(ContextT context) {
Optional<Instant> timeoutTime = timeout.map(timeout -> clock.instant().plus(timeout));
synchronized (lock) {
while (true) {
Optional<Duration> durationRemaining = timeoutTime.map(t -> Duration.between(clock.instant(), t));
if (durationRemaining.isPresent()
&& (durationRemaining.get().isNegative() || durationRemaining.get().isZero())) {
return Optional.empty();
}

// Try to acquire a token and return immediately if successful
Optional<Listener> listener;
listener = delegate.acquire(context);
if (listener.isPresent()) {
return listener;
}

// We have reached the limit so block until a token is released
try {
if (durationRemaining.isPresent() && !durationRemaining.get().isZero()) {
lock.wait(durationRemaining.get().toMillis());
} else {
lock.wait();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Optional.empty();
}
}
}
}

private void unblock() {
synchronized (lock) {
lock.notifyAll();
}
}

@Override
public Optional<Listener> acquire(ContextT context) {
return tryAcquire(context).map(delegate -> new Listener() {
@Override
public void onSuccess() {
delegate.onSuccess();
unblock();
}

@Override
public void onIgnore() {
delegate.onIgnore();
unblock();
}

@Override
public void onDropped() {
delegate.onDropped();
unblock();
}
});
}

@Override
public String toString() {
return "BlockingLimiter [" + delegate + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

import com.google.common.annotations.VisibleForTesting;
import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limiter.BlockingLimiter;
import com.netflix.concurrency.limits.limit.VegasLimit;
import com.netflix.concurrency.limits.limiter.SimpleLimiter;
import com.palantir.logsafe.SafeArg;
import com.palantir.remoting3.tracing.okhttp3.OkhttpTraceInterceptor;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import okhttp3.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Flow control in Conjure is a collaborative effort between servers and clients. Servers advertise an overloaded state
Expand All @@ -49,20 +54,39 @@
* success. If this is not done, a deadlock could result.
*/
final class ConcurrencyLimiters {
private static final Logger log = LoggerFactory.getLogger(ConcurrencyLimiters.class);
private static final Duration TIMEOUT = Duration.ofMinutes(1);
private static final Void NO_CONTEXT = null;
private static final String FALLBACK = "";

private final ConcurrentMap<String, Limiter<Void>> limiters = new ConcurrentHashMap<>();

Limiter.Listener limiter(Request request) {
return limiter(limiterKey(request));
}

@VisibleForTesting
Limiter.Listener limiter(String name) {
return limiters.computeIfAbsent(name, key ->
new IdempotentLimiter(new BlockingLimiter<>(RemotingConcurrencyLimiter.createDefault())))
.acquire(NO_CONTEXT).get();
Limiter<Void> limiter = limiters.computeIfAbsent(name, key -> newLimiter());
Optional<Limiter.Listener> listener = limiter.acquire(NO_CONTEXT);
return listener.orElseGet(() -> {
if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException("Thread was interrupted");
}
log.warn("Timed out waiting to get permits for concurrency. In most cases this would indicate "
+ "some kind of deadlock. We expect that either this is caused by not closing response "
+ "bodies (there should be OkHttp log lines indicating this), or service overloading.",
SafeArg.of("timeout", TIMEOUT));
limiters.replace(name, limiter, newLimiter());
return limiter(name);
});
}

Limiter.Listener limiter(Request request) {
return limiter(limiterKey(request));
private static Limiter<Void> newLimiter() {
Limiter<Void> limiter = SimpleLimiter.newBuilder()
.limit(RemotingWindowedLimit.newBuilder().build(VegasLimit.newDefault()))
.build();
return BlockingLimiter.wrap(limiter, TIMEOUT);
}

private static String limiterKey(Request request) {
Expand All @@ -73,50 +97,4 @@ private static String limiterKey(Request request) {
return request.method() + " " + pathTemplate;
}
}

private static final class IdempotentLimiter implements Limiter<Void> {
private final Limiter<Void> delegate;

private IdempotentLimiter(Limiter<Void> delegate) {
this.delegate = delegate;
}

@Override
public Optional<Listener> acquire(Void context) {
return delegate.acquire(context).map(IdempotentListener::new);
}
}

private static final class IdempotentListener implements Limiter.Listener {
private final Limiter.Listener delegate;
private boolean consumed = false;

private IdempotentListener(Limiter.Listener delegate) {
this.delegate = delegate;
}

@Override
public void onSuccess() {
if (!consumed) {
delegate.onSuccess();
}
consumed = true;
}

@Override
public void onIgnore() {
if (!consumed) {
delegate.onIgnore();
}
consumed = true;
}

@Override
public void onDropped() {
if (!consumed) {
delegate.onDropped();
}
consumed = true;
}
}
}
Loading