Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2dcd409
Better behaviour in the presence of backing off
j-baker Aug 2, 2018
c583b33
Merge branch 'develop' into jbaker/better_429_behaviour
j-baker Aug 2, 2018
d51d3ac
more docs, easier to read
j-baker Aug 2, 2018
19a8173
Better concurrency limiters
j-baker Aug 3, 2018
d7164de
fixes
j-baker Aug 3, 2018
39388b7
simplify
j-baker Aug 3, 2018
f2927d0
Checkstyle
j-baker Aug 13, 2018
8f57de6
checkstyle
j-baker Aug 13, 2018
8f02b8b
PR comments
j-baker Sep 2, 2018
fbbcc41
Tweak the concurrency limiters lib
j-baker Sep 2, 2018
27a2153
reset flow control test
j-baker Sep 2, 2018
e548996
changes
j-baker Sep 2, 2018
a4c9e68
Changes
j-baker Sep 2, 2018
c68bc89
Passes the build
j-baker Sep 2, 2018
91dd6d2
update lockfiles
j-baker Sep 2, 2018
2229a81
Merge remote-tracking branch 'origin/develop' into jbaker/better_429_…
j-baker Sep 11, 2018
ee8e539
New attempt using interceptor
j-baker Sep 11, 2018
036d45b
more comments
j-baker Sep 11, 2018
1e18435
some bullshit
j-baker Sep 12, 2018
fbdeab1
Perfect
j-baker Sep 12, 2018
951dfdd
cleanup
j-baker Sep 12, 2018
ed18c48
Ready to go?
j-baker Sep 13, 2018
fda4d64
docs
j-baker Sep 13, 2018
8a088d7
Checkstyle
j-baker Sep 13, 2018
a9721e4
chekcstyle
j-baker Sep 13, 2018
1ce7b82
Metric
j-baker Sep 13, 2018
bc38b76
Javadoc
iamdanfox Sep 13, 2018
addbdca
README describes new flow control
iamdanfox Sep 13, 2018
c97925a
Move docs -> class level javadoc
iamdanfox Sep 13, 2018
baaa142
Rename ConcurrencyLimiters#limiter -> acquireLimiter
iamdanfox Sep 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixes
  • Loading branch information
j-baker committed Aug 3, 2018
commit d7164dec5e7ea66daff39a656646b609997cae38
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public final class ClientConfigurations {
private static final Duration DEFAULT_FAILED_URL_COOLDOWN = Duration.ZERO;
private static final boolean DEFAULT_ENABLE_GCM_CIPHERS = false;
private static final NodeSelectionStrategy DEFAULT_NODE_SELECTION_STRATEGY = NodeSelectionStrategy.PIN_UNTIL_ERROR;
private static final int DEFAULT_MAX_NUM_RETRIES = 3;
private static final int DEFAULT_MAX_NUM_RETRIES = 4;

private ClientConfigurations() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.netflix.concurrency.limits.Limiter;
Expand Down Expand Up @@ -113,17 +112,15 @@ static final class ConcurrencyLimiter {
private final Queue<SettableFuture<Limiter.Listener>> waitingRequests = new LinkedBlockingQueue<>();
private final Limiter<Void> limiter;

public ConcurrencyLimiter(Limiter<Void> limiter) {
@VisibleForTesting
ConcurrencyLimiter(Limiter<Void> limiter) {
this.limiter = limiter;
}

public ListenableFuture<Limiter.Listener> acquire() {
Optional<Limiter.Listener> maybeListener = limiter.acquire(null);
if (maybeListener.isPresent()) {
return Futures.immediateFuture(wrap(activeListeners, maybeListener.get()));
}
SettableFuture<Limiter.Listener> future = SettableFuture.create();
waitingRequests.add(future);
processQueue();
return future;
}

Expand All @@ -138,13 +135,12 @@ private void processQueue() {
if (head == null) {
acquired.onIgnore();
} else {
head.set(acquired);
head.set(wrap(acquired));
}
}
}

private Limiter.Listener wrap(
Map<Limiter.Listener, Runnable> activeListeners, Limiter.Listener listener) {
private Limiter.Listener wrap(Limiter.Listener listener) {
Limiter.Listener res = new Limiter.Listener() {
@Override
public void onSuccess() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.concurrency.limits.Limiter;
import com.palantir.remoting3.okhttp.ConcurrencyLimiters.ConcurrencyLimiter;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import jersey.repackaged.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
Expand Down Expand Up @@ -74,63 +54,4 @@ public void testRetriesCorrectNumberOfTimesAndFindsRandomBackoffWithInExponentia

assertThat(backoff.nextBackoff()).isEmpty();
}

@Test
public void testBackingOff() throws ExecutionException, InterruptedException {
int numThreads = 160;
int numRequestsPerSecond = 40;
int numRetries = 3;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat("test-name-%d").build()));
RateLimiter rateLimiter = RateLimiter.create(numRequestsPerSecond);

ConcurrencyLimiter limiter = new ConcurrencyLimiters().limiter("");
Meter meter = new Meter();
Histogram backoffHistogram = new Histogram(new ExponentiallyDecayingReservoir());

List<ListenableFuture<?>> futures = IntStream.range(0, numThreads).mapToObj(x -> executorService.submit(new Runnable() {
ExponentialBackoff backoff;
int backoffIndex = 0;

@Override
public void run() {
for (int i = 0; i < 1001;) {
Limiter.Listener listener = Futures.getUnchecked(limiter.acquire());
//System.out.println(i);
boolean gotRateLimited = !rateLimiter.tryAcquire();
if (!gotRateLimited) {
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
listener.onSuccess();
meter.mark();
if (i++ % 1 == 0) {
System.out.println("i " + i + " avg " + meter.getMeanRate() + " avgbackoffs " + backoffHistogram.getSnapshot().getMean());
}
backoffHistogram.update(backoffIndex);
backoff = null;
backoffIndex = 0;
} else {
initializeBackoff();
Optional<Duration> sleep = backoff.nextBackoff();
if (!sleep.isPresent()) {
throw new RuntimeException("i " + i);
} else {
System.out.println("Backoff: " + ++backoffIndex);
Uninterruptibles.sleepUninterruptibly(sleep.get().toMillis(), TimeUnit.MILLISECONDS);
}
listener.onDropped();
}
}
System.out.println("done");
}

private void initializeBackoff() {
if (backoff != null) {
return;
}
backoff = new ExponentialBackoff(numRetries, Duration.ofMillis(250), ThreadLocalRandom.current());
}
})).collect(Collectors.toList());

Futures.allAsList(futures).get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright 2018 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.remoting3.okhttp;

import com.codahale.metrics.Meter;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import com.netflix.concurrency.limits.Limiter;
import com.palantir.remoting3.okhttp.ConcurrencyLimiters.ConcurrencyLimiter;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is a simulation of the flow control primitives used by this library, in order to allow the developer
* to try different strategies.
* <p>
* It is run in CI, but only to prevent code breakages - this is in general a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"...in general a"?

*/
public final class FlowControlTest {
private static final Logger log = LoggerFactory.getLogger(FlowControlTest.class);
private static final int REQUESTS_PER_THREAD = System.getenv("CI") == null ? 1000 : 1;
private static final ConcurrencyLimiters limiters = new ConcurrencyLimiters();
private static ListeningExecutorService executorService;

private final ConcurrencyLimiter limiter = limiters.limiter(UUID.randomUUID().toString());

@BeforeClass
public static void beforeClass() {
executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
}

@AfterClass
public static void afterClass() {
executorService.shutdown();
}

@Test
public void test16ThreadsRateLimit20() throws ExecutionException, InterruptedException {
Meter rate = new Meter();
List<ListenableFuture<?>> tasks = createWorkers(rate, 16, 20, Duration.ofMillis(100))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would also be interesting to see what happens when you add more workers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turns out the number of workers doesn't actually affect this until the rate limit is also large (because waiting isn't factored into this calculation).

.map(executorService::submit)
.collect(Collectors.toList());
ListenableFuture<?> task = Futures.allAsList(tasks);
while (!task.isDone()) {
sleep(1000);
log.info("Average rate is {}, 1 minute rate is {}", rate.getMeanRate(), rate.getOneMinuteRate());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertThat(rate.getMeanRate()).isBetween(19, 21)? not sure where these values are expected to be but some sort of broad assertion seems reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually kinda interesting, you don't expect to necessarily get that close to the maximum.

Basically if the target concurrency rate is small enough, it won't be able to compensate right. So for example in this case, you basically cycle between:

1 thread, 10 requests a second.
2 threads, 20 requests a second, perfect
3 threads, 30 requests a second, fails
1 thread, 10 requests a second
2 thread, 20 requests a second
etc etc

and you basically end up at about 16, overall.

}
task.get();
}

private Stream<Worker> createWorkers(Meter rate, int numThreads, int rateLimit, Duration delay) {
RateLimiter rateLimiter = RateLimiter.create(rateLimit);
return IntStream.range(0, numThreads)
.mapToObj(unused -> new Worker(
() -> new ExponentialBackoff(4, Duration.ofMillis(250), ThreadLocalRandom.current()),
limiter,
delay,
rateLimiter,
rate));
}

private static class Worker implements Runnable {
private final Supplier<BackoffStrategy> backoffFactory;
private final ConcurrencyLimiter limiter;
private final Duration successDuration;
private final RateLimiter rateLimiter;
private final Meter meter;

private BackoffStrategy backoff;

private Worker(
Supplier<BackoffStrategy> backoffFactory,
ConcurrencyLimiter limiter,
Duration successDuration,
RateLimiter rateLimiter,
Meter meter) {
this.backoffFactory = backoffFactory;
this.limiter = limiter;
this.successDuration = successDuration;
this.rateLimiter = rateLimiter;
this.meter = meter;
}

@Override
public void run() {
for (int i = 0; i < REQUESTS_PER_THREAD;) {
Limiter.Listener listener = Futures.getUnchecked(limiter.acquire());
boolean gotRateLimited = !rateLimiter.tryAcquire();
if (!gotRateLimited) {
meter.mark();
sleep(successDuration.toMillis());
listener.onSuccess();
backoff = null;
i++;
} else {
initializeBackoff();
Optional<Duration> sleep = backoff.nextBackoff();
if (!sleep.isPresent()) {
listener.onIgnore();
throw new RuntimeException("Failed on request " + i);
} else {
sleep(sleep.get().toMillis());
}
listener.onDropped();
}
}
}

private void initializeBackoff() {
if (backoff != null) {
return;
}
backoff = backoffFactory.get();
}
}

private static void sleep(long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}