Skip to content

Commit 014520a

Browse files
feat: add race tolerance to query-tee (#20228)
Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 8584d92 commit 014520a

File tree

5 files changed

+111
-58
lines changed

5 files changed

+111
-58
lines changed

tools/querytee/fanout_handler.go

Lines changed: 93 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type FanOutHandler struct {
3232
comparator comparator.ResponsesComparator
3333
instrumentCompares bool
3434
enableRace bool
35+
raceTolerance time.Duration
3536
}
3637

3738
// FanOutHandlerConfig holds configuration for creating a FanOutHandler.
@@ -45,6 +46,7 @@ type FanOutHandlerConfig struct {
4546
Comparator comparator.ResponsesComparator
4647
InstrumentCompares bool
4748
EnableRace bool
49+
RaceTolerance time.Duration
4850
}
4951

5052
// NewFanOutHandler creates a new FanOutHandler.
@@ -59,6 +61,7 @@ func NewFanOutHandler(cfg FanOutHandlerConfig) *FanOutHandler {
5961
comparator: cfg.Comparator,
6062
instrumentCompares: cfg.InstrumentCompares,
6163
enableRace: cfg.EnableRace,
64+
raceTolerance: cfg.RaceTolerance,
6265
}
6366
}
6467

@@ -108,69 +111,47 @@ func (h *FanOutHandler) Do(ctx context.Context, req queryrangebase.Request) (que
108111
if err != nil {
109112
return nil, fmt.Errorf("failed to extract tenant IDs: %w", err)
110113
}
111-
shouldSample := false
112-
if h.goldfishManager != nil {
113-
for _, tenant := range tenants {
114-
if h.goldfishManager.ShouldSample(tenant) {
115-
shouldSample = true
116-
level.Debug(h.logger).Log(
117-
"msg", "Goldfish sampling decision",
118-
"tenant", tenant,
119-
"sampled", shouldSample,
120-
"path", httpReq.URL.Path)
121-
break
122-
}
123-
}
124-
}
125-
126-
results := make(chan *backendResult, len(h.backends))
127-
128-
for i, backend := range h.backends {
129-
go func(_ int, b *ProxyBackend) {
130-
result := h.executeBackendRequest(ctx, httpReq, body, b, req)
131-
132-
// ensure a valid status code is set in case of error
133-
if result.err != nil && result.backendResp.status == 0 {
134-
result.backendResp.status = statusCodeFromError(result.err)
135-
}
136-
137-
results <- result
138-
139-
// Record metrics
140-
h.recordMetrics(result, httpReq.Method, issuer)
141-
}(i, backend)
142-
}
143-
114+
shouldSample := h.shouldSample(tenants, httpReq)
115+
results := h.makeBackendRequests(ctx, httpReq, body, req, issuer)
144116
collected := make([]*backendResult, 0, len(h.backends))
145117

146118
for i := 0; i < len(h.backends); i++ {
147119
result := <-results
148120
collected = append(collected, result)
149121

150122
// Race mode: return first successful response from ANY backend
151-
// TODO: move race logic to a separate function, and catch the condition in Do and fan out to two different functions
152-
// rather than have it all nested in one.
153123
if h.enableRace {
154-
// Check if this is the first successful result (race winner)
155124
if result.err == nil && result.backendResp.succeeded() {
156-
// Record race win metric
157-
h.metrics.raceWins.WithLabelValues(
158-
result.backend.name,
159-
h.routeName,
160-
).Inc()
161-
162-
// Spawn goroutine to collect remaining responses
125+
winner := result
163126
remaining := len(h.backends) - i - 1
164-
go func() {
165-
h.collectRemainingAndCompare(remaining, httpReq, results, collected, shouldSample)
166-
}()
167127

168-
return result.response, nil
128+
// If the preferred (v1) backend wins, then apply the handicap to give v2 a
129+
// chance to "win" by finishing within the race tolerance of v1.
130+
if result.backend.preferred && h.raceTolerance > 0 {
131+
select {
132+
case r2 := <-results:
133+
collected = append(collected, r2)
134+
if r2.err == nil && r2.backendResp.succeeded() {
135+
winner = r2
136+
remaining = len(h.backends) - i - 2
137+
}
138+
case <-time.After(h.raceTolerance):
139+
// tolerance expired, fall back to original winner
140+
}
141+
}
142+
143+
return h.finishRace(
144+
winner,
145+
remaining,
146+
httpReq,
147+
results,
148+
collected,
149+
shouldSample)
169150
}
170151
} else {
171-
// Non-race mode: existing logic (wait for preferred)
152+
// Non-race mode: legacy logic (wait for preferred)
172153
if result.backend.preferred {
173-
// when the preferred backend fails (5xx or request error) we return any successful backend response
154+
// when the preferred backend fails return any successful response
174155
if !result.backendResp.succeeded() {
175156
continue
176157
}
@@ -212,6 +193,20 @@ func (h *FanOutHandler) Do(ctx context.Context, req queryrangebase.Request) (que
212193
return nil, fmt.Errorf("all backends failed")
213194
}
214195

196+
// finishRace records the race winner and spawns a goroutine to collect remaining results.
197+
func (h *FanOutHandler) finishRace(winner *backendResult, remaining int, httpReq *http.Request, results <-chan *backendResult, collected []*backendResult, shouldSample bool) (queryrangebase.Response, error) {
198+
h.metrics.raceWins.WithLabelValues(
199+
winner.backend.name,
200+
h.routeName,
201+
).Inc()
202+
203+
go func() {
204+
h.collectRemainingAndCompare(remaining, httpReq, results, collected, shouldSample)
205+
}()
206+
207+
return winner.response, nil
208+
}
209+
215210
// collectRemainingAndCompare collects remaining backend results, performs comparisons,
216211
// and processes goldfish sampling. Should be called asynchronously to not block preferred response from returning.
217212
func (h *FanOutHandler) collectRemainingAndCompare(remaining int, httpReq *http.Request, results <-chan *backendResult, collected []*backendResult, shouldSample bool) {
@@ -430,6 +425,55 @@ func (h *FanOutHandler) WithComparator(comparator comparator.ResponsesComparator
430425
return h
431426
}
432427

428+
// shouldSample determines if a query should be sampled for goldfish comparison.
429+
func (h *FanOutHandler) shouldSample(tenants []string, httpReq *http.Request) bool {
430+
if h.goldfishManager == nil {
431+
return false
432+
}
433+
434+
for _, tenant := range tenants {
435+
if h.goldfishManager.ShouldSample(tenant) {
436+
level.Debug(h.logger).Log(
437+
"msg", "Goldfish sampling decision",
438+
"tenant", tenant,
439+
"sampled", true,
440+
"path", httpReq.URL.Path)
441+
return true
442+
}
443+
}
444+
445+
return false
446+
}
447+
448+
// makeBackendRequests initiates backend requests and returns a channel for receiving results.
449+
func (h *FanOutHandler) makeBackendRequests(
450+
ctx context.Context,
451+
httpReq *http.Request,
452+
body []byte,
453+
req queryrangebase.Request,
454+
issuer string,
455+
) chan *backendResult {
456+
results := make(chan *backendResult, len(h.backends))
457+
458+
for i, backend := range h.backends {
459+
go func(_ int, b *ProxyBackend) {
460+
result := h.executeBackendRequest(ctx, httpReq, body, b, req)
461+
462+
// ensure a valid status code is set in case of error
463+
if result.err != nil && result.backendResp.status == 0 {
464+
result.backendResp.status = statusCodeFromError(result.err)
465+
}
466+
467+
results <- result
468+
469+
// Record metrics
470+
h.recordMetrics(result, httpReq.Method, issuer)
471+
}(i, backend)
472+
}
473+
474+
return results
475+
}
476+
433477
func extractOriginalHeaders(ctx context.Context) http.Header {
434478
if headers, ok := ctx.Value(originalHTTPHeadersKey).(http.Header); ok {
435479
return headers

tools/querytee/fanout_handler_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,9 @@ func TestFanOutHandler_Do_WithFilter(t *testing.T) {
198198
require.Equal(t, 1, requestCount, "expected only 1 backend to receive request due to filter")
199199
}
200200

201-
func TestFanOutHandler_Do_RaceModeReturnsFirstSuccessful(t *testing.T) {
201+
func TestFanOutHandler_Do_RaceModeReturnsNonPreferredIfWithinTolerance(t *testing.T) {
202202
backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
203-
time.Sleep(50 * time.Millisecond)
203+
time.Sleep(10 * time.Millisecond)
204204
w.Header().Set("Content-Type", "application/json")
205205
w.WriteHeader(http.StatusOK)
206206
_, err := w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[{"stream":{"backend":"1"},"values":[["1000000000","log line 1"]]}]}}`))
@@ -209,7 +209,7 @@ func TestFanOutHandler_Do_RaceModeReturnsFirstSuccessful(t *testing.T) {
209209
defer backend1.Close()
210210

211211
backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
212-
time.Sleep(10 * time.Millisecond)
212+
time.Sleep(50 * time.Millisecond)
213213
w.Header().Set("Content-Type", "application/json")
214214
w.WriteHeader(http.StatusOK)
215215
_, err := w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[{"stream":{"backend":"2"},"values":[["1000000000","log line 2"]]}]}}`))
@@ -226,12 +226,13 @@ func TestFanOutHandler_Do_RaceModeReturnsFirstSuccessful(t *testing.T) {
226226
}
227227

228228
handler := NewFanOutHandler(FanOutHandlerConfig{
229-
Backends: backends,
230-
Codec: queryrange.DefaultCodec,
231-
Logger: log.NewNopLogger(),
232-
Metrics: NewProxyMetrics(prometheus.NewRegistry()),
233-
RouteName: "test_route",
234-
EnableRace: true,
229+
Backends: backends,
230+
Codec: queryrange.DefaultCodec,
231+
Logger: log.NewNopLogger(),
232+
Metrics: NewProxyMetrics(prometheus.NewRegistry()),
233+
RouteName: "test_route",
234+
EnableRace: true,
235+
RaceTolerance: 100 * time.Millisecond,
235236
})
236237

237238
req := &queryrange.LokiRequest{

tools/querytee/handler_factory.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package querytee
22

33
import (
44
"net/http"
5+
"time"
56

67
"github.com/go-kit/log"
78

@@ -22,6 +23,7 @@ type HandlerFactory struct {
2223
enableRace bool
2324
logger log.Logger
2425
metrics *ProxyMetrics
26+
raceTolerance time.Duration
2527
}
2628

2729
// HandlerFactoryConfig holds configuration for creating a HandlerFactory.
@@ -33,6 +35,7 @@ type HandlerFactoryConfig struct {
3335
EnableRace bool
3436
Logger log.Logger
3537
Metrics *ProxyMetrics
38+
RaceTolerance time.Duration
3639
}
3740

3841
// NewHandlerFactory creates a new HandlerFactory.
@@ -45,6 +48,7 @@ func NewHandlerFactory(cfg HandlerFactoryConfig) *HandlerFactory {
4548
enableRace: cfg.EnableRace,
4649
logger: cfg.Logger,
4750
metrics: cfg.Metrics,
51+
raceTolerance: cfg.RaceTolerance,
4852
}
4953
}
5054

tools/querytee/proxy.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type ProxyConfig struct {
4444
RequestURLFilter *regexp.Regexp
4545
InstrumentCompares bool
4646
EnableRace bool
47+
RaceTolerance time.Duration
4748
Goldfish goldfish.Config
4849
}
4950

@@ -66,6 +67,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
6667
})
6768
f.BoolVar(&cfg.InstrumentCompares, "proxy.compare-instrument", false, "Reports metrics on comparisons of responses between preferred and non-preferred endpoints for supported routes.")
6869
f.BoolVar(&cfg.EnableRace, "proxy.enable-race", false, "When enabled, return the first successful response from any backend instead of waiting for the preferred backend.")
70+
f.DurationVar(&cfg.RaceTolerance, "proxy.race-tolerance", 100*time.Millisecond, "The tolerance for handicapping races in favor of non-preferred backends. If the preferred backend finishes first but a non-preferred backend completes within this tolerance, the non-preferred backend is declared the winner.")
6971

7072
// Register Goldfish configuration flags
7173
cfg.Goldfish.RegisterFlags(f)
@@ -299,6 +301,7 @@ func (p *Proxy) Start() error {
299301
Metrics: p.metrics,
300302
InstrumentCompares: p.cfg.InstrumentCompares,
301303
EnableRace: p.cfg.EnableRace,
304+
RaceTolerance: p.cfg.RaceTolerance,
302305
})
303306
queryHandler, err := routeHandlerFactory.CreateHandler(route.RouteName, comp)
304307
if err != nil {

tools/querytee/proxy_endpoint.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,7 @@ func (r *BackendResponse) statusCode() int {
388388
return r.status
389389
}
390390

391+
// TODO(twhitney): detectIssuer should also detect Grafana as an issuer
391392
func detectIssuer(r *http.Request) string {
392393
if strings.HasPrefix(r.Header.Get("User-Agent"), "loki-canary") {
393394
return canaryIssuer

0 commit comments

Comments
 (0)