Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ website/docs-pre-processed/
# React build assets
pkg/ui/static/react

/tmp
tmp/bin
examples/tmp/

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6222](https://github.com/thanos-io/thanos/pull/6222) mixin(Receive): Fix tenant series received charts.
- [#6218](https://github.com/thanos-io/thanos/pull/6218) mixin(Store): handle ResourceExhausted as a non-server error. As a consequence, this error won't contribute to Store's grpc errors alerts.
- [#6271](https://github.com/thanos-io/thanos/pull/6271) Receive: Fix segfault in `LabelValues` during head compaction.
- [#6229](https://github.com/thanos-io/thanos/pull/6229) Compact: Fixed issue with no-compact-mark.json being ignored

### Changed
- [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: Make ketama hashring fail early when configured with number of nodes lower than the replication factor.
Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func runCompact(
metadata.HashFunc(conf.hashFunc),
conf.blockFilesConcurrency,
conf.compactBlocksFetchConcurrency,
noCompactMarkerFilter,
)
tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter)
planner := compact.WithLargeTotalIndexSizeFilter(
Expand Down
11 changes: 11 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ type DefaultGrouper struct {
hashFunc metadata.HashFunc
blockFilesConcurrency int
compactBlocksFetchConcurrency int
noCompactionMarkFilter *GatherNoCompactionMarkFilter
}

// NewDefaultGrouper makes a new DefaultGrouper.
Expand All @@ -247,6 +248,7 @@ func NewDefaultGrouper(
hashFunc metadata.HashFunc,
blockFilesConcurrency int,
compactBlocksFetchConcurrency int,
noCompactionMarkFilter *GatherNoCompactionMarkFilter,
) *DefaultGrouper {
return &DefaultGrouper{
bkt: bkt,
Expand Down Expand Up @@ -279,14 +281,23 @@ func NewDefaultGrouper(
hashFunc: hashFunc,
blockFilesConcurrency: blockFilesConcurrency,
compactBlocksFetchConcurrency: compactBlocksFetchConcurrency,
noCompactionMarkFilter: noCompactionMarkFilter,
}
}

// Groups returns the compaction groups for all blocks currently known to the syncer.
// It creates all groups from the scratch on every call.
func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Group, err error) {
groups := map[string]*Group{}
noCompactMarkedMap := g.noCompactionMarkFilter.NoCompactMarkedBlocks()
for _, m := range blocks {
if _, ok := noCompactMarkedMap[m.ULID]; ok {
level.Debug(g.logger).Log(
"msg", "groupper: skipping block marked for no compaction",
"ulid", m.ULID.String())

continue
}
groupKey := m.Thanos.GroupKey()
group, ok := groups[groupKey]
if !ok {
Expand Down
18 changes: 12 additions & 6 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -25,8 +26,6 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/objtesting"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/dedup"
Expand Down Expand Up @@ -103,6 +102,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency)
noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(nil, objstore.WithNoopInstr(bkt), fetcherConcurrency)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
testutil.Ok(t, err)

Expand Down Expand Up @@ -138,7 +138,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
testutil.Ok(t, sy.GarbageCollect(ctx))

// Only the level 3 block, the last source block in both resolutions should be left.
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, 10, 10)
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, 10, 10, noCompactMarkerFilter)
groups, err := grouper.Groups(sy.Metas())
testutil.Ok(t, err)

Expand Down Expand Up @@ -211,7 +211,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
testutil.Ok(t, err)

planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 10, 10)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 10, 10, noCompactMarkerFilter)
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true)
testutil.Ok(t, err)

Expand Down Expand Up @@ -279,6 +279,12 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
{{Name: "a", Value: "7"}},
},
},
{
numSamples: 100, mint: 6000, maxt: 7000, extLset: extLabels, res: 124,
series: []labels.Labels{
{{Name: "a", Value: "8"}},
},
},
// Second group (extLabels2).
{
numSamples: 100, mint: 2000, maxt: 3000, extLset: extLabels2, res: 124,
Expand Down Expand Up @@ -317,7 +323,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
})

groupKey1 := metas[0].Thanos.GroupKey()
groupKey2 := metas[6].Thanos.GroupKey()
groupKey2 := metas[7].Thanos.GroupKey()

testutil.Ok(t, bComp.Compact(ctx))
testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks))
Expand Down Expand Up @@ -409,7 +415,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
testutil.Equals(t, uint64(5), meta.Stats.NumSeries)
testutil.Equals(t, uint64(2*4*100-100), meta.Stats.NumSamples)
testutil.Equals(t, 2, meta.Compaction.Level)
testutil.Equals(t, []ulid.ULID{metas[6].ULID, metas[7].ULID}, meta.Compaction.Sources)
testutil.Equals(t, []ulid.ULID{metas[7].ULID, metas[8].ULID}, meta.Compaction.Sources)

// Check thanos meta.
testutil.Assert(t, labels.Equal(extLabels2, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match")
Expand Down
9 changes: 6 additions & 3 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ func TestRetentionProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1)
noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1, noCompactMarkerFilter)

type groupedResult map[string]float64

Expand Down Expand Up @@ -376,7 +377,8 @@ func TestCompactProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1)
noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1, noCompactMarkerFilter)

for _, tcase := range []struct {
testName string
Expand Down Expand Up @@ -498,7 +500,8 @@ func TestDownsampleProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for downsample progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1)
noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1, 1, noCompactMarkerFilter)

for _, tcase := range []struct {
testName string
Expand Down
14 changes: 7 additions & 7 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-one-block-marked-for-no-compact"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-one-block-marked-for-no-compact"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-one-block-marked-for-no-compact"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup"}},
Expand Down Expand Up @@ -574,7 +574,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-one-block-marked-for-no-compact"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-one-block-marked-for-no-compact"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-one-block-marked-for-no-compact"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup"}},
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup"}},
Expand Down Expand Up @@ -697,7 +697,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
testutil.Ok(t, c.WaitSumMetrics(e2emon.Greater(0), "thanos_compact_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(0), "thanos_compact_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(0), "thanos_compact_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(2*4+2+2*3+2), "thanos_compact_blocks_marked_total")) // 18.
testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(2*4+2+2*3+2+1), "thanos_compact_blocks_marked_total")) // 19.
testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(0), "thanos_compact_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(6), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(3), "thanos_compact_group_vertical_compactions_total"))
Expand Down Expand Up @@ -766,7 +766,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
// NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_iterations_total"}, e2emon.WaitMissingMetrics()))
testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(18), []string{"thanos_compact_blocks_cleaned_total"}, e2emon.WaitMissingMetrics()))
testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(19), []string{"thanos_compact_blocks_cleaned_total"}, e2emon.WaitMissingMetrics()))
testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_block_cleanup_failures_total"}, e2emon.WaitMissingMetrics()))
testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_blocks_marked_total"}, e2emon.WaitMissingMetrics()))
testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_aborted_partial_uploads_deletion_attempts_total"}, e2emon.WaitMissingMetrics()))
Expand All @@ -779,7 +779,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics()))
testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_downsample_failures_total"}, e2emon.WaitMissingMetrics()))

testutil.Ok(t, str.WaitSumMetricsWithOptions(e2emon.Equals(float64(len(rawBlockIDs)+8+6-18-2+2)), []string{"thanos_blocks_meta_synced"}, e2emon.WaitMissingMetrics()))
testutil.Ok(t, str.WaitSumMetricsWithOptions(e2emon.Equals(float64(len(rawBlockIDs)+8+6-18-2+2-1)), []string{"thanos_blocks_meta_synced"}, e2emon.WaitMissingMetrics()))
testutil.Ok(t, str.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_blocks_meta_sync_failures_total"}, e2emon.WaitMissingMetrics()))

testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_halted"}, e2emon.WaitMissingMetrics()))
Expand All @@ -802,7 +802,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
)

// Store view:
testutil.Ok(t, str.WaitSumMetrics(e2emon.Equals(float64(len(rawBlockIDs)+8-18+6-2+2)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2emon.Equals(float64(len(rawBlockIDs)+8-18+6-2+2-1)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, str.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_modified"))
}
Expand Down Expand Up @@ -836,7 +836,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
testutil.Ok(t, str.Stop())
testutil.Ok(t, e2e.StartAndWaitReady(str))
testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
return str.WaitSumMetrics(e2emon.Equals(float64(len(rawBlockIDs)+8-18+6-2+2-1)), "thanos_blocks_meta_synced")
return str.WaitSumMetrics(e2emon.Equals(float64(len(rawBlockIDs)+8-18+6-2+2-1-1)), "thanos_blocks_meta_synced")
}))
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics(), e2emon.WithLabelMatchers(
matchers.MustNewMatcher(matchers.MatchEqual, "store_type", "store"),
Expand Down