Skip to content

Commit 4eda600

Browse files
committed
store: fix deduplication in proxy heap for series with multiple chunks
Signed-off-by: Michael Hoffmann <[email protected]>
1 parent bfd3296 commit 4eda600

File tree

2 files changed

+68
-12
lines changed

2 files changed

+68
-12
lines changed

pkg/store/proxy_heap.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package store
66
import (
77
"container/heap"
88
"context"
9+
"encoding/binary"
910
"fmt"
1011
"io"
1112
"sort"
@@ -111,24 +112,30 @@ func (d *dedupResponseHeap) Next() bool {
111112
func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb.SeriesResponse {
112113
chunkDedupMap := map[uint64]*storepb.AggrChunk{}
113114

115+
buf := make([]byte, 8)
116+
hasher := xxhash.New()
117+
114118
for _, s := range series {
115119
for _, chk := range s.GetSeries().Chunks {
120+
hasher.Reset()
116121
for _, field := range []*storepb.Chunk{
117122
chk.Raw, chk.Count, chk.Max, chk.Min, chk.Sum, chk.Counter,
118123
} {
119-
if field == nil {
120-
continue
121-
}
122-
hash := field.Hash
123-
if hash == 0 {
124-
hash = xxhash.Sum64(field.Data)
125-
}
126-
127-
if _, ok := chunkDedupMap[hash]; !ok {
128-
chk := chk
129-
chunkDedupMap[hash] = &chk
130-
break
124+
switch {
125+
case field == nil:
126+
binary.BigEndian.PutUint64(buf, 0)
127+
case field.Hash != 0:
128+
binary.BigEndian.PutUint64(buf, field.Hash)
129+
default:
130+
binary.BigEndian.PutUint64(buf, xxhash.Sum64(field.Data))
131131
}
132+
_, _ = hasher.Write(buf)
133+
}
134+
aggrChunkHash := hasher.Sum64()
135+
if _, ok := chunkDedupMap[aggrChunkHash]; !ok {
136+
chk := chk
137+
chunkDedupMap[aggrChunkHash] = &chk
138+
break
132139
}
133140
}
134141
}

pkg/store/proxy_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2072,6 +2072,55 @@ func TestDedupRespHeap_Deduplication(t *testing.T) {
20722072
testFn func(responses []*storepb.SeriesResponse, h *dedupResponseHeap)
20732073
tname string
20742074
}{
2075+
{
2076+
tname: "dedups identical series with multiple chunks",
2077+
responses: []*storepb.SeriesResponse{
2078+
{
2079+
Result: &storepb.SeriesResponse_Series{
2080+
Series: &storepb.Series{
2081+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2082+
Chunks: []storepb.AggrChunk{
2083+
{
2084+
Count: &storepb.Chunk{
2085+
Type: storepb.Chunk_XOR,
2086+
Data: []byte(`count`),
2087+
},
2088+
Sum: &storepb.Chunk{
2089+
Type: storepb.Chunk_XOR,
2090+
Data: []byte(`sum`),
2091+
},
2092+
},
2093+
},
2094+
},
2095+
},
2096+
},
2097+
{
2098+
Result: &storepb.SeriesResponse_Series{
2099+
Series: &storepb.Series{
2100+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
2101+
Chunks: []storepb.AggrChunk{
2102+
{
2103+
Count: &storepb.Chunk{
2104+
Type: storepb.Chunk_XOR,
2105+
Data: []byte(`count`),
2106+
},
2107+
Sum: &storepb.Chunk{
2108+
Type: storepb.Chunk_XOR,
2109+
Data: []byte(`sum`),
2110+
},
2111+
},
2112+
},
2113+
},
2114+
},
2115+
},
2116+
},
2117+
testFn: func(responses []*storepb.SeriesResponse, h *dedupResponseHeap) {
2118+
testutil.Equals(t, true, h.Next())
2119+
resp := h.At()
2120+
testutil.Equals(t, responses[0], resp)
2121+
testutil.Equals(t, false, h.Next())
2122+
},
2123+
},
20752124
{
20762125
tname: "edge case with zero responses",
20772126
responses: []*storepb.SeriesResponse{},

0 commit comments

Comments
 (0)