Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 39 additions & 13 deletions pkg/dedup/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package dedup

import (
"math"
"os"
"strconv"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -15,6 +17,39 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
)

var (
// Add a minimum penalty value to make it possible to skip the particularly similar point.
// The default behavior will traverse all points.
// After the setting value is greater than 0, it will skip minPenalty+1 ms time to traverse.
// More details: https://github.com/thanos-io/thanos/pull/8061#issue-2794728444
minPenalty int64

// For the series we didn't pick, add a penalty twice as high as the delta of the last two
// samples to the next seek against it.
// This ensures that we don't pick a sample too close, which would increase the overall
// sample frequency. It also guards against clock drift and inaccuracies during
// timestamp assignment.
// If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
initialPenalty int64 = 5000
)

func init() {
// Modify minPenalty for deduplication if the environment variable is set.
minPtEnv := os.Getenv("THANOS_DEDUP_MIN_PENALTY")
minPt, err := strconv.ParseInt(minPtEnv, 10, 0)
if err == nil && minPt >= 0 {
minPenalty = minPt
}

// Modify initialPenalty for deduplication if the environment variable is set.
initialPtEnv := os.Getenv("THANOS_DEDUP_INIT_PENALTY")
initialPt, err := strconv.ParseInt(initialPtEnv, 10, 0)
if err == nil && initialPt >= 0 {
initialPenalty = initialPt
}
}

type dedupSeriesSet struct {
set storage.SeriesSet
isCounter bool
Expand Down Expand Up @@ -318,15 +353,15 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
if it.bval != chunkenc.ValNone {
it.lastT = it.b.AtT()
it.lastIter = it.b
it.penB = 0
it.penB = minPenalty
}
return it.bval
}
if it.bval == chunkenc.ValNone {
it.useA = true
it.lastT = it.a.AtT()
it.lastIter = it.a
it.penA = 0
it.penA = minPenalty
return it.aval
}
// General case where both iterators still have data. We pick the one
Expand All @@ -338,22 +373,13 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {

it.useA = ta <= tb

// For the series we didn't pick, add a penalty twice as high as the delta of the last two
// samples to the next seek against it.
// This ensures that we don't pick a sample too close, which would increase the overall
// sample frequency. It also guards against clock drift and inaccuracies during
// timestamp assignment.
// If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
const initialPenalty = 5000

if it.useA {
if it.lastT != math.MinInt64 {
it.penB = 2 * (ta - it.lastT)
} else {
it.penB = initialPenalty
}
it.penA = 0
it.penA = minPenalty
it.lastT = ta
it.lastIter = it.a

Expand All @@ -364,7 +390,7 @@ func (it *dedupSeriesIterator) Next() chunkenc.ValueType {
} else {
it.penA = initialPenalty
}
it.penB = 0
it.penB = minPenalty
it.lastT = tb
it.lastIter = it.b
return it.bval
Expand Down