Skip to content

Commit 3debaeb

Browse files
alfred-landrumbwplotka
authored andcommitted
store the first raw value of a chunk during downsampling (#1709)
* store the first raw value of a chunk during downsampling As discussed in #1568, storing only the last raw value of a chunk will lose a counter reset when: a) the reset occurs at a chunk boundary, and b) the last raw value of the earlier chunk is less than the first aggregated value of the later chunk. This commit stores the first raw value of a chunk during the initial raw aggregation, and retains it during subsequent aggregations. This is similar to the existing handling for the last raw value of a chunk. With this change, when counterSeriesIterator iterates over a chunk boundary, it will see the last raw value of the earlier chunk, then the first raw value of the later chunk, and then the first aggregated value of the later chunk. The first raw value will always be less than or equal to the first aggregated value, so the only difference in counterSeriesIterator's output will be the possible detection of a reset and an extra sample after the chunk boundary. Fixes: #1568 Signed-off-by: Alfred Landrum <alfred@leakybucket.org> * changelog for #1709 Signed-off-by: Alfred Landrum <alfred@leakybucket.org> * adjust existing downsampling tests Signed-off-by: Alfred Landrum <alfred@leakybucket.org> * add counter aggregation comments to CounterSeriesIterator Signed-off-by: Alfred Landrum <alfred@leakybucket.org>
1 parent 1acdf7c commit 3debaeb

File tree

3 files changed

+169
-31
lines changed

3 files changed

+169
-31
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
2424
- [#1656](https://github.com/thanos-io/thanos/pull/1656) Thanos Store now starts metric and status probe HTTP server earlier in its start-up sequence. `/-/healthy` endpoint now starts to respond with success earlier. `/metrics` endpoint starts serving metrics earlier as well. Make sure to point your readiness probes to the `/-/ready` endpoint rather than `/metrics`.
2525
- [#1669](https://github.com/thanos-io/thanos/pull/1669) Fixed store sharding. Now it does not load excluded meta.jsons and load/fetch index-cache.json files.
2626
- [#1670](https://github.com/thanos-io/thanos/pull/1670) Fixed un-ordered blocks upload. Sidecar now uploads the oldest blocks first.
27+
- [#1568](https://github.com/thanos-io/thanos/pull/1709) Thanos Store now retains the first raw value of a chunk during downsampling to avoid losing some counter resets that occur on an aggregation boundary.
2728

2829
### Changed
2930

pkg/compact/downsample/downsample.go

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -289,10 +289,6 @@ func (b *aggrChunkBuilder) add(t int64, aggr *aggregator) {
289289
b.added++
290290
}
291291

292-
func (b *aggrChunkBuilder) finalizeChunk(lastT int64, trueSample float64) {
293-
b.apps[AggrCounter].Append(lastT, trueSample)
294-
}
295-
296292
func (b *aggrChunkBuilder) encode() chunks.Meta {
297293
return chunks.Meta{
298294
MinTime: b.mint,
@@ -306,14 +302,17 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta {
306302
if len(data) == 0 {
307303
return nil
308304
}
309-
var (
310-
mint, maxt = data[0].t, data[len(data)-1].t
311-
// We assume a raw resolution of 1 minute. In practice it will often be lower
312-
// but this is sufficient for our heuristic to produce well-sized chunks.
313-
numChunks = targetChunkCount(mint, maxt, 1*60*1000, resolution, len(data))
314-
chks = make([]chunks.Meta, 0, numChunks)
315-
batchSize = (len(data) / numChunks) + 1
316-
)
305+
306+
mint, maxt := data[0].t, data[len(data)-1].t
307+
// We assume a raw resolution of 1 minute. In practice it will often be lower
308+
// but this is sufficient for our heuristic to produce well-sized chunks.
309+
numChunks := targetChunkCount(mint, maxt, 1*60*1000, resolution, len(data))
310+
return downsampleRawLoop(data, resolution, numChunks)
311+
}
312+
313+
func downsampleRawLoop(data []sample, resolution int64, numChunks int) []chunks.Meta {
314+
batchSize := (len(data) / numChunks) + 1
315+
chks := make([]chunks.Meta, 0, numChunks)
317316

318317
for len(data) > 0 {
319318
j := batchSize
@@ -327,14 +326,18 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta {
327326
for ; j < len(data) && data[j].t <= curW; j++ {
328327
}
329328

330-
ab := newAggrChunkBuilder()
331329
batch := data[:j]
332330
data = data[j:]
333331

332+
ab := newAggrChunkBuilder()
333+
334+
// Encode first raw value; see CounterSeriesIterator.
335+
ab.apps[AggrCounter].Append(batch[0].t, batch[0].v)
336+
334337
lastT := downsampleBatch(batch, resolution, ab.add)
335338

336-
// InjectThanosMeta the chunk's counter aggregate with the last true sample.
337-
ab.finalizeChunk(lastT, batch[len(batch)-1].v)
339+
// Encode last raw value; see CounterSeriesIterator.
340+
ab.apps[AggrCounter].Append(lastT, batch[len(batch)-1].v)
338341

339342
chks = append(chks, ab.encode())
340343
}
@@ -379,18 +382,21 @@ func downsampleBatch(data []sample, resolution int64, add func(int64, *aggregato
379382

380383
// downsampleAggr downsamples a sequence of aggregation chunks to the given resolution.
381384
func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes int64) ([]chunks.Meta, error) {
382-
// We downsample aggregates only along chunk boundaries. This is required for counters
383-
// to be downsampled correctly since a chunks' last counter value is the true last value
384-
// of the original series. We need to preserve it even across multiple aggregation iterations.
385385
var numSamples int
386386
for _, c := range chks {
387387
numSamples += c.NumSamples()
388388
}
389-
var (
390-
numChunks = targetChunkCount(mint, maxt, inRes, outRes, numSamples)
391-
res = make([]chunks.Meta, 0, numChunks)
392-
batchSize = len(chks) / numChunks
393-
)
389+
numChunks := targetChunkCount(mint, maxt, inRes, outRes, numSamples)
390+
return downsampleAggrLoop(chks, buf, outRes, numChunks)
391+
}
392+
393+
func downsampleAggrLoop(chks []*AggrChunk, buf *[]sample, resolution int64, numChunks int) ([]chunks.Meta, error) {
394+
// We downsample aggregates only along chunk boundaries. This is required
395+
// for counters to be downsampled correctly since a chunk's first and last
396+
// counter values are the true values of the original series. We need
397+
// to preserve them even across multiple aggregation iterations.
398+
res := make([]chunks.Meta, 0, numChunks)
399+
batchSize := len(chks) / numChunks
394400

395401
for len(chks) > 0 {
396402
j := batchSize
@@ -400,12 +406,13 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes
400406
part := chks[:j]
401407
chks = chks[j:]
402408

403-
chk, err := downsampleAggrBatch(part, buf, outRes)
409+
chk, err := downsampleAggrBatch(part, buf, resolution)
404410
if err != nil {
405411
return nil, err
406412
}
407413
res = append(res, chk)
408414
}
415+
409416
return res, nil
410417
}
411418

@@ -512,6 +519,9 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
512519
ab.chunks[AggrCounter] = chunkenc.NewXORChunk()
513520
ab.apps[AggrCounter], _ = ab.chunks[AggrCounter].Appender()
514521

522+
// Retain first raw value; see CounterSeriesIterator.
523+
ab.apps[AggrCounter].Append((*buf)[0].t, (*buf)[0].v)
524+
515525
lastT := downsampleBatch(*buf, resolution, func(t int64, a *aggregator) {
516526
if t < mint {
517527
mint = t
@@ -520,6 +530,8 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
520530
}
521531
ab.apps[AggrCounter].Append(t, a.counter)
522532
})
533+
534+
// Retain last raw value; see CounterSeriesIterator.
523535
ab.apps[AggrCounter].Append(lastT, it.lastV)
524536

525537
ab.mint = mint
@@ -532,11 +544,18 @@ type sample struct {
532544
v float64
533545
}
534546

535-
// CounterSeriesIterator iterates over an ordered sequence of chunks and treats decreasing
536-
// values as counter reset.
537-
// Additionally, it can deal with downsampled counter chunks, which set the last value of a chunk
538-
// to the original last value. The last value can be detected by checking whether the timestamp
539-
// did not increase w.r.t to the previous sample.
547+
// CounterSeriesIterator generates monotonically increasing values by iterating
548+
// over an ordered sequence of chunks, which should be raw or aggregated chunks
549+
// of counter values. The generated samples can be used by PromQL functions
550+
// like 'rate' that calculate differences between counter values.
551+
//
552+
// Counter aggregation chunks must have the first and last values from their
553+
// original raw series: the first raw value should be the first value encoded
554+
// in the chunk, and the last raw value is encoded by the duplication of the
555+
// previous sample's timestamp. As iteration occurs between chunks, the
556+
// comparison between the last raw value of the earlier chunk and the first raw
557+
// value of the later chunk ensures that counter resets between chunks are
558+
// recognized and that the correct value delta is calculated.
540559
type CounterSeriesIterator struct {
541560
chks []chunkenc.Iterator
542561
i int // Current chunk.

pkg/compact/downsample/downsample_test.go

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,124 @@ import (
2323
"github.com/thanos-io/thanos/pkg/testutil"
2424
)
2525

26+
func TestDownsampleCounterBoundaryReset(t *testing.T) {
27+
28+
toAggrChunks := func(t *testing.T, cm []chunks.Meta) (res []*AggrChunk) {
29+
for i := range cm {
30+
achk, ok := cm[i].Chunk.(*AggrChunk)
31+
testutil.Assert(t, ok, "expected *AggrChunk")
32+
res = append(res, achk)
33+
}
34+
return
35+
}
36+
37+
counterSamples := func(t *testing.T, achks []*AggrChunk) (res []sample) {
38+
for _, achk := range achks {
39+
chk, err := achk.Get(AggrCounter)
40+
testutil.Ok(t, err)
41+
42+
iter := chk.Iterator(nil)
43+
for iter.Next() {
44+
t, v := iter.At()
45+
res = append(res, sample{t, v})
46+
}
47+
}
48+
return
49+
}
50+
51+
counterIterate := func(t *testing.T, achks []*AggrChunk) (res []sample) {
52+
var iters []chunkenc.Iterator
53+
for _, achk := range achks {
54+
chk, err := achk.Get(AggrCounter)
55+
testutil.Ok(t, err)
56+
iters = append(iters, chk.Iterator(nil))
57+
}
58+
59+
citer := NewCounterSeriesIterator(iters...)
60+
for citer.Next() {
61+
t, v := citer.At()
62+
res = append(res, sample{t: t, v: v})
63+
}
64+
return
65+
}
66+
67+
type test struct {
68+
raw []sample
69+
rawAggrResolution int64
70+
expectedRawAggrChunks int
71+
rawCounterSamples []sample
72+
rawCounterIterate []sample
73+
aggrAggrResolution int64
74+
aggrChunks int
75+
aggrCounterSamples []sample
76+
aggrCounterIterate []sample
77+
}
78+
79+
tests := []test{
80+
{
81+
// In this test case, counter resets occur at the
82+
// boundaries between the t=49,t=99 and t=99,t=149
83+
// windows, and the values in the t=49, t=99, and
84+
// t=149 windows are high enough that the resets
85+
// will only be accounted for if the first raw value
86+
// of a chunk is maintained during aggregation.
87+
// See #1568 for more details.
88+
raw: []sample{
89+
{t: 10, v: 1}, {t: 20, v: 3}, {t: 30, v: 5},
90+
{t: 50, v: 1}, {t: 60, v: 8}, {t: 70, v: 10},
91+
{t: 120, v: 1}, {t: 130, v: 18}, {t: 140, v: 20},
92+
{t: 160, v: 21}, {t: 170, v: 38}, {t: 180, v: 40},
93+
},
94+
rawAggrResolution: 50,
95+
expectedRawAggrChunks: 4,
96+
rawCounterSamples: []sample{
97+
{t: 10, v: 1}, {t: 30, v: 5}, {t: 30, v: 5},
98+
{t: 50, v: 1}, {t: 70, v: 10}, {t: 70, v: 10},
99+
{t: 120, v: 1}, {t: 140, v: 20}, {t: 140, v: 20},
100+
{t: 160, v: 21}, {t: 180, v: 40}, {t: 180, v: 40},
101+
},
102+
rawCounterIterate: []sample{
103+
{t: 10, v: 1}, {t: 30, v: 5},
104+
{t: 50, v: 6}, {t: 70, v: 15},
105+
{t: 120, v: 16}, {t: 140, v: 35},
106+
{t: 160, v: 36}, {t: 180, v: 55},
107+
},
108+
aggrAggrResolution: 2 * 50,
109+
aggrChunks: 2,
110+
aggrCounterSamples: []sample{
111+
{t: 10, v: 1}, {t: 70, v: 15}, {t: 70, v: 10},
112+
{t: 120, v: 1}, {t: 180, v: 40}, {t: 180, v: 40},
113+
},
114+
aggrCounterIterate: []sample{
115+
{t: 10, v: 1}, {t: 70, v: 15},
116+
{t: 120, v: 16}, {t: 180, v: 55},
117+
},
118+
},
119+
}
120+
121+
doTest := func(t *testing.T, test *test) {
122+
// Asking for more chunks than raw samples ensures that downsampleRawLoop
123+
// will create chunks with samples from a single window.
124+
cm := downsampleRawLoop(test.raw, test.rawAggrResolution, len(test.raw)+1)
125+
testutil.Equals(t, test.expectedRawAggrChunks, len(cm))
126+
127+
rawAggrChunks := toAggrChunks(t, cm)
128+
testutil.Equals(t, test.rawCounterSamples, counterSamples(t, rawAggrChunks))
129+
testutil.Equals(t, test.rawCounterIterate, counterIterate(t, rawAggrChunks))
130+
131+
var buf []sample
132+
acm, err := downsampleAggrLoop(rawAggrChunks, &buf, test.aggrAggrResolution, test.aggrChunks)
133+
testutil.Ok(t, err)
134+
testutil.Equals(t, test.aggrChunks, len(acm))
135+
136+
aggrAggrChunks := toAggrChunks(t, acm)
137+
testutil.Equals(t, test.aggrCounterSamples, counterSamples(t, aggrAggrChunks))
138+
testutil.Equals(t, test.aggrCounterIterate, counterIterate(t, aggrAggrChunks))
139+
}
140+
141+
doTest(t, &tests[0])
142+
}
143+
26144
func TestExpandChunkIterator(t *testing.T) {
27145
// Validate that expanding the chunk iterator filters out-of-order samples
28146
// and staleness markers.
@@ -56,7 +174,7 @@ func TestDownsampleRaw(t *testing.T) {
56174
AggrSum: {{99, 7}, {199, 17}, {250, 1}},
57175
AggrMin: {{99, 1}, {199, 2}, {250, 1}},
58176
AggrMax: {{99, 3}, {199, 10}, {250, 1}},
59-
AggrCounter: {{99, 4}, {199, 13}, {250, 14}, {250, 1}},
177+
AggrCounter: {{20, 1}, {99, 4}, {199, 13}, {250, 14}, {250, 1}},
60178
},
61179
},
62180
}
@@ -93,7 +211,7 @@ func TestDownsampleAggr(t *testing.T) {
93211
AggrSum: {{499, 29}, {999, 100}},
94212
AggrMin: {{499, -3}, {999, 0}},
95213
AggrMax: {{499, 10}, {999, 100}},
96-
AggrCounter: {{499, 210}, {999, 320}, {1299, 430}, {1299, 110}},
214+
AggrCounter: {{99, 100}, {499, 210}, {999, 320}, {1299, 430}, {1299, 110}},
97215
},
98216
},
99217
}

0 commit comments

Comments
 (0)