diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index c74c2290ad6..6f0f14604bb 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -226,8 +226,8 @@ func (c *Syncer) syncMetas(ctx context.Context) error { lset := promlables.FromMap(meta.Thanos.Labels) processedLabels := relabel.Process(lset, c.relabelConfig...) if processedLabels == nil { - level.Warn(c.logger).Log("msg", "dropping block(drop in relabeling)", "block", id) - return + level.Debug(c.logger).Log("msg", "dropping block(drop in relabeling)", "block", id) + continue } c.blocksMtx.Lock() diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 727f1a37a51..ae363a5c349 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -26,6 +26,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/objtesting" "github.com/thanos-io/thanos/pkg/testutil" + "gopkg.in/yaml.v2" ) func TestSyncer_SyncMetas_e2e(t *testing.T) { @@ -74,7 +75,6 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, ids[5:], groups[0].IDs()) }) - } func TestSyncer_GarbageCollect_e2e(t *testing.T) { @@ -358,3 +358,86 @@ func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels, return uid, nil } + +func TestSyncer_SyncMetasFilter_e2e(t *testing.T) { + var err error + + relabelContentYaml := ` + - action: drop + regex: "A" + source_labels: + - cluster + ` + var relabelConfig []*relabel.Config + err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig) + testutil.Ok(t, err) + + extLsets := []labels.Labels{{{Name: "cluster", Value: "A"}}, {{Name: "cluster", Value: "B"}}} + + objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig) + testutil.Ok(t, err) + + var ids []ulid.ULID + var metas []*metadata.Meta + + for i := 0; i < 16; i++ { + id, err := ulid.New(uint64(i), nil) + testutil.Ok(t, err) + + var meta metadata.Meta + meta.Version = 1 + meta.ULID = id + meta.Thanos = metadata.Thanos{ + Labels: extLsets[i%2].Map(), + } + + ids = append(ids, id) + metas = append(metas, &meta) + } + for _, m := range metas[:10] { + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) + } + + testutil.Ok(t, sy.SyncMetas(ctx)) + + groups, err := sy.Groups() + testutil.Ok(t, err) + var evenIds []ulid.ULID + for i := 0; i < 10; i++ { + if i%2 != 0 { + evenIds = append(evenIds, ids[i]) + } + } + testutil.Equals(t, evenIds, groups[0].IDs()) + + // Upload last 6 blocks. + for _, m := range metas[10:] { + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) + } + + // Delete first 4 blocks. + for _, m := range metas[:4] { + testutil.Ok(t, block.Delete(ctx, log.NewNopLogger(), bkt, m.ULID)) + } + + testutil.Ok(t, sy.SyncMetas(ctx)) + + groups, err = sy.Groups() + testutil.Ok(t, err) + evenIds = make([]ulid.ULID, 0) + for i := 4; i < 16; i++ { + if i%2 != 0 { + evenIds = append(evenIds, ids[i]) + } + } + testutil.Equals(t, evenIds, groups[0].IDs()) + }) +} diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 1c4d006cf8e..433a5498679 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -126,7 +126,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o return } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64, relabelConfig []*relabel.Config) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -138,7 +138,6 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m labels.FromStrings("a", "2", "c", "2"), } extLset := labels.FromStrings("ext1", "value1") - relabelConfig := make([]*relabel.Config, 0) blocks, minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset) @@ -393,7 +392,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig) defer s.Close() t.Log("Test with no index cache") @@ -442,7 +441,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0) + s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig) defer s.Close() indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{ @@ -476,7 +475,6 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { labels.FromStrings("a", "1", "b", "2"), } extLset := labels.FromStrings("ext1", "value1") - relabelConfig := make([]*relabel.Config, 0) _, minTime, _ := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset) @@ -487,7 +485,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { &FilterConfig{ MinTime: minTimeDuration, MaxTime: filterMaxTime, - }, relabelConfig) + }, emptyRelabelConfig) testutil.Ok(t, err) err = store.SyncBlocks(ctx) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e22c4f754bf..2c984af976d 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -31,6 +31,8 @@ import ( "gopkg.in/yaml.v2" ) +var emptyRelabelConfig = make([]*relabel.Config, 0) + func TestBucketBlock_Property(t *testing.T) { parameters := gopter.DefaultTestParameters() parameters.Rng.Seed(2000) @@ -428,8 +430,7 @@ func TestBucketStore_Info(t *testing.T) { dir, err := ioutil.TempDir("", "bucketstore-test") testutil.Ok(t, err) - relabelConfig := make([]*relabel.Config, 0) - bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, relabelConfig) + bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, emptyRelabelConfig) testutil.Ok(t, err) resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) @@ -447,7 +448,6 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} extLset := labels.FromStrings("ext1", "value1") - relabelConfig := make([]*relabel.Config, 0) // Create a block in range [-2w, -1w]. id1, err := testutil.CreateBlock(ctx, dir, series, 10, @@ -488,7 +488,7 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { &FilterConfig{ MinTime: minTimeDuration, MaxTime: hourBefore, - }, relabelConfig) + }, emptyRelabelConfig) testutil.Ok(t, err) inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1) @@ -579,3 +579,61 @@ func TestBucketStore_selectorBlocks(t *testing.T) { testutil.Equals(t, sc.exceptedIds, ids) } } + +func TestBucketStore_InfoWithLabels(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dir, err := ioutil.TempDir("", "bucketstore-test") + testutil.Ok(t, err) + + bkt := inmem.NewBucket() + series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} + + logger := log.NewNopLogger() + id1, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "A"}}, 0) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id1.String(), block.IndexFilename), path.Join(id1.String(), block.IndexFilename))) + + id2, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id2.String(), block.IndexFilename), path.Join(id2.String(), block.IndexFilename))) + + id3, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id3.String(), block.IndexFilename), path.Join(id3.String(), block.IndexFilename))) + + relabelContentYaml := ` + - action: drop + regex: "A" + source_labels: + - cluster + ` + var relabelConfig []*relabel.Config + err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig) + testutil.Ok(t, err) + bucketStore, err := NewBucketStore(nil, nil, bkt, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, relabelConfig) + testutil.Ok(t, err) + + err = bucketStore.SyncBlocks(ctx) + testutil.Ok(t, err) + + resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) + testutil.Ok(t, err) + + testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType) + testutil.Equals(t, int64(0), resp.MinTime) + testutil.Equals(t, int64(1000), resp.MaxTime) + testutil.Equals(t, []storepb.LabelSet{ + storepb.LabelSet{ + Labels: []storepb.Label{ + { + Name: "cluster", + Value: "B", + }, + }, + }, + }, resp.LabelSets) +} diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 3200da34570..65c636771de 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -243,7 +243,7 @@ func querier(http, grpc address, storeAddresses []address, fileSDStoreAddresses } } -func storeGateway(http, grpc address, bucketConfig []byte) *serverScheduler { +func storeGateway(http, grpc address, bucketConfig []byte, relabelConfig []byte) *serverScheduler { return &serverScheduler{ HTTP: http, GRPC: grpc, @@ -264,6 +264,7 @@ func storeGateway(http, grpc address, bucketConfig []byte) *serverScheduler { "--objstore.config", string(bucketConfig), // Accelerated sync time for quicker test (3m by default). "--sync-block-duration", "5s", + "--selector.relabel-config", string(relabelConfig), )), nil }, } diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index abf1b00b693..5cf5078afe2 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -42,7 +42,13 @@ func TestStoreGateway(t *testing.T) { config, err := yaml.Marshal(bucketConfig) testutil.Ok(t, err) - s := storeGateway(a.New(), a.New(), config) + relabelContentYaml := ` + - action: drop + regex: "value2" + source_labels: + - ext1 + ` + s := storeGateway(a.New(), a.New(), config, []byte(relabelContentYaml)) q := querier(a.New(), a.New(), []address{s.GRPC}, nil) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -67,6 +73,7 @@ func TestStoreGateway(t *testing.T) { } extLset := labels.FromStrings("ext1", "value1", "replica", "1") extLset2 := labels.FromStrings("ext1", "value1", "replica", "2") + extLset3 := labels.FromStrings("ext1", "value2", "replica", "3") now := time.Now() id1, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0) @@ -75,6 +82,9 @@ func TestStoreGateway(t *testing.T) { id2, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset2, 0) testutil.Ok(t, err) + id3, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset3, 0) + testutil.Ok(t, err) + l := log.NewLogfmtLogger(os.Stdout) bkt, err := s3.NewBucketWithConfig(l, s3Config, "test-feed") @@ -82,6 +92,7 @@ func TestStoreGateway(t *testing.T) { testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String())) testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id2.String()), id2.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id3.String()), id3.String())) var res model.Vector