Skip to content

Commit 84ccd15

Browse files
authored
Querier: dedupe chunks streamed from ingesters (grafana#463)
* Querier: dedupe chunks streamed from ingesters Samples are replicated to all ingesters, so when we query there is a good chance we receive exactly the same chunk more than once. Deduping at the whole-chunk level is much more efficient than doing it in the chunkMergeIterator later. * Different implementation of accumulateChunks
1 parent e5ab3d8 commit 84ccd15

2 files changed

Lines changed: 23 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@
113113
* `cortex_querier_blocks_found_total`
114114
* `cortex_querier_blocks_queried_total`
115115
* `cortex_querier_blocks_with_compactor_shard_but_incompatible_query_shard_total`
116-
* [ENHANCEMENT] Querier: reduce latency and peak memory consumption. #459
116+
* [ENHANCEMENT] Querier&Ruler: reduce cpu usage, latency and peak memory consumption. #459 #463
117117
* [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #206
118118
* [BUGFIX] Fixes a panic in the query-tee when comparing result. #207
119119
* [BUGFIX] Upgrade Prometheus. TSDB now waits for pending readers before truncating Head block, fixing the `chunk not found` error and preventing wrong query results. #16

pkg/distributor/query.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
239239
key := ingester_client.LabelsToKeyString(mimirpb.FromLabelAdaptersToLabels(series.Labels))
240240
existing := hashToChunkseries[key]
241241
existing.Labels = series.Labels
242-
existing.Chunks = append(existing.Chunks, series.Chunks...)
242+
existing.Chunks = accumulateChunks(existing.Chunks, series.Chunks)
243243
hashToChunkseries[key] = existing
244244
}
245245

@@ -383,3 +383,24 @@ func sameSamples(a, b []mimirpb.Sample) bool {
383383
}
384384
return true
385385
}
386+
387+
// Build a slice of chunks, eliminating duplicates.
388+
// This is O(N^2) but most of the time N is small.
389+
func accumulateChunks(a, b []ingester_client.Chunk) []ingester_client.Chunk {
390+
ret := a
391+
for j := range b {
392+
if !containsChunk(a, b[j]) {
393+
ret = append(ret, b[j])
394+
}
395+
}
396+
return ret
397+
}
398+
399+
func containsChunk(a []ingester_client.Chunk, b ingester_client.Chunk) bool {
400+
for i := range a {
401+
if a[i].Equal(b) {
402+
return true
403+
}
404+
}
405+
return false
406+
}

0 commit comments

Comments
 (0)