Skip to content

Commit 68b4eff

Browse files
committed
cmd/thanos/compact: add bucket UI
This commit enhances the compact component so that it runs the bucket UI whenever the --wait flag is also passed. In order to reduce the overhead of running the UI in addition to the compactor, this commit also introduces an abstraction around downloading block meta files allowing the metadata to be downloaded once and cached. This ensures that the compactor does not unnecessarily download every metadata file twice. Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
1 parent f86cf81 commit 68b4eff

File tree

8 files changed

+104
-29
lines changed

8 files changed

+104
-29
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1818
- [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss) for further information.
1919
- [#1680](https://github.com/thanos-io/thanos/pull/1680) Add a new `--http-grace-period` CLI option to components which serve HTTP to set how long to wait until HTTP Server shuts down.
2020
- [#1712](https://github.com/thanos-io/thanos/pull/1712) Rename flag on bucket web component from `--listen` to `--http-address` to match other components.
21+
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.
2122

2223
### Fixed
2324

cmd/thanos/bucket.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,6 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
316316
label := cmd.Flag("label", "Prometheus label to use as timeline title").String()
317317

318318
m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
319-
ctx, cancel := context.WithCancel(context.Background())
320-
321319
statusProber := prober.NewProber(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
322320
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
323321
srv := httpserver.New(logger, reg, component.Bucket, statusProber,
@@ -342,8 +340,20 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
342340
level.Warn(logger).Log("msg", "Refresh interval should be at least 2 times the timeout")
343341
}
344342

343+
confContentYaml, err := objStoreConfig.Content()
344+
if err != nil {
345+
return err
346+
}
347+
348+
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Bucket.String())
349+
if err != nil {
350+
return errors.Wrap(err, "bucket client")
351+
}
352+
353+
ctx, cancel := context.WithCancel(context.Background())
345354
g.Add(func() error {
346-
return refresh(ctx, logger, bucketUI, *interval, *timeout, name, reg, objStoreConfig)
355+
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
356+
return refresh(ctx, logger, bucketUI, *interval, *timeout, bkt, block.MetaDownloaderFn(block.DownloadMeta))
347357
}, func(error) {
348358
cancel()
349359
})
@@ -354,25 +364,14 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
354364
}
355365
}
356366

357-
// refresh metadata from remote storage periodically and update UI.
358-
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, name string, reg *prometheus.Registry, objStoreConfig *extflag.PathOrContent) error {
359-
confContentYaml, err := objStoreConfig.Content()
360-
if err != nil {
361-
return err
362-
}
363-
364-
bkt, err := client.NewBucket(logger, confContentYaml, reg, name)
365-
if err != nil {
366-
return errors.Wrap(err, "bucket client")
367-
}
368-
369-
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
367+
// refresh metadata from remote storage periodically and update the UI.
368+
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, bkt objstore.Bucket, metaDownloader block.MetaDownloader) error {
370369
return runutil.Repeat(duration, ctx.Done(), func() error {
371370
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
372371
iterCtx, iterCancel := context.WithTimeout(ctx, timeout)
373372
defer iterCancel()
374373

375-
blocks, err := download(iterCtx, logger, bkt)
374+
blocks, err := download(iterCtx, logger, bkt, metaDownloader)
376375
if err != nil {
377376
bucketUI.Set("[]", err)
378377
return err
@@ -389,7 +388,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati
389388
})
390389
}
391390

392-
func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket) (blocks []metadata.Meta, err error) {
391+
func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket, metaDownloader block.MetaDownloader) (blocks []metadata.Meta, err error) {
393392
level.Info(logger).Log("msg", "synchronizing block metadata")
394393

395394
if err = bkt.Iter(ctx, "", func(name string) error {
@@ -398,7 +397,7 @@ func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket) (bloc
398397
return nil
399398
}
400399

401-
meta, err := block.DownloadMeta(ctx, logger, bkt, id)
400+
meta, err := metaDownloader.DownloadMeta(ctx, logger, bkt, id)
402401
if err != nil {
403402
return err
404403
}

cmd/thanos/compact.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@ import (
1616
"github.com/opentracing/opentracing-go"
1717
"github.com/pkg/errors"
1818
"github.com/prometheus/client_golang/prometheus"
19+
"github.com/prometheus/common/route"
1920
"github.com/prometheus/prometheus/tsdb"
2021
"github.com/thanos-io/thanos/pkg/block"
2122
"github.com/thanos-io/thanos/pkg/block/metadata"
2223
"github.com/thanos-io/thanos/pkg/compact"
2324
"github.com/thanos-io/thanos/pkg/compact/downsample"
2425
"github.com/thanos-io/thanos/pkg/component"
2526
"github.com/thanos-io/thanos/pkg/extflag"
27+
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
2628
"github.com/thanos-io/thanos/pkg/objstore"
2729
"github.com/thanos-io/thanos/pkg/objstore/client"
2830
"github.com/thanos-io/thanos/pkg/prober"
2931
"github.com/thanos-io/thanos/pkg/runutil"
3032
httpserver "github.com/thanos-io/thanos/pkg/server/http"
33+
"github.com/thanos-io/thanos/pkg/ui"
3134
"gopkg.in/alecthomas/kingpin.v2"
3235
)
3336

@@ -113,6 +116,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
113116

114117
selectorRelabelConf := regSelectorRelabelFlags(cmd)
115118

119+
label := cmd.Flag("bucket-web-label", "Prometheus label to use as timeline title in the bucket web UI").String()
120+
116121
m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
117122
return runCompact(g, logger, reg,
118123
*httpAddr,
@@ -135,6 +140,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
135140
*blockSyncConcurrency,
136141
*compactionConcurrency,
137142
selectorRelabelConf,
143+
*label,
138144
)
139145
}
140146
}
@@ -159,6 +165,7 @@ func runCompact(
159165
blockSyncConcurrency int,
160166
concurrency int,
161167
selectorRelabelConf *extflag.PathOrContent,
168+
label string,
162169
) error {
163170
halted := prometheus.NewGauge(prometheus.GaugeOpts{
164171
Name: "thanos_compactor_halted",
@@ -211,8 +218,9 @@ func runCompact(
211218
}
212219
}()
213220

221+
metaDownloader := block.NewCachingMetaDownloader(block.MetaDownloaderFn(block.DownloadMeta))
214222
sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
215-
blockSyncConcurrency, acceptMalformedIndex, relabelConfig)
223+
blockSyncConcurrency, acceptMalformedIndex, relabelConfig, metaDownloader)
216224
if err != nil {
217225
return errors.Wrap(err, "create syncer")
218226
}
@@ -295,6 +303,9 @@ func runCompact(
295303
return nil
296304
}
297305

306+
// Compaction and bucket refresh interval.
307+
interval := 5 * time.Minute
308+
298309
g.Add(func() error {
299310
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
300311

@@ -310,7 +321,7 @@ func runCompact(
310321
}
311322

312323
// --wait=true is specified.
313-
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
324+
return runutil.Repeat(interval, ctx.Done(), func() error {
314325
err := f()
315326
if err == nil {
316327
return nil
@@ -343,6 +354,19 @@ func runCompact(
343354
cancel()
344355
})
345356

357+
if wait {
358+
router := route.New()
359+
bucketUI := ui.NewBucketUI(logger, label)
360+
bucketUI.Register(router, extpromhttp.NewInstrumentationMiddleware(reg))
361+
srv.Handle("/", router)
362+
363+
g.Add(func() error {
364+
return refresh(ctx, logger, bucketUI, interval, time.Minute, bkt, metaDownloader)
365+
}, func(error) {
366+
cancel()
367+
})
368+
}
369+
346370
level.Info(logger).Log("msg", "starting compact node")
347371
statusProber.SetReady()
348372
return nil

docs/components/compact.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,5 +135,8 @@ Flags:
135135
selecting blocks. It follows native Prometheus
136136
relabel-config syntax. See format details:
137137
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
138+
--bucket-web-label=BUCKET-WEB-LABEL
139+
Prometheus label to use as timeline title in the
140+
bucket web UI
138141
139142
```

pkg/block/block.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"path"
1111
"path/filepath"
1212
"strings"
13+
"sync"
1314

1415
"github.com/go-kit/kit/log/level"
1516

@@ -188,3 +189,47 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
188189
id, err := ulid.Parse(filepath.Base(path))
189190
return id, err == nil
190191
}
192+
193+
// MetaDownloader abstracts anything that can download block metas.
194+
type MetaDownloader interface {
195+
DownloadMeta(context.Context, log.Logger, objstore.Bucket, ulid.ULID) (metadata.Meta, error)
196+
}
197+
198+
// MetaDownloaderFn turns any func that downloads metas into a MetaDownloader.
199+
type MetaDownloaderFn func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error)
200+
201+
// DownloadMeta implements the MetaDownloader interface.
202+
func (m MetaDownloaderFn) DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) {
203+
return m(ctx, logger, bkt, id)
204+
}
205+
206+
// CachingMetaDownloader is a MetaDownloader that can cache metas.
207+
type CachingMetaDownloader struct {
208+
sync.Mutex
209+
next MetaDownloader
210+
metas map[ulid.ULID]metadata.Meta
211+
}
212+
213+
// NewCachingMetaDownloader creates a new MetaDownloader that can cache metas
214+
// and uses the given MetaDownloader when something is not in the cache.
215+
func NewCachingMetaDownloader(next MetaDownloader) MetaDownloader {
216+
return &CachingMetaDownloader{
217+
next: next,
218+
metas: make(map[ulid.ULID]metadata.Meta),
219+
}
220+
}
221+
222+
// DownloadMeta implements the MetaDownloader interface.
223+
func (c *CachingMetaDownloader) DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) {
224+
c.Lock()
225+
defer c.Unlock()
226+
m, ok := c.metas[id]
227+
if ok {
228+
return m, nil
229+
}
230+
m, err := c.next.DownloadMeta(ctx, logger, bkt, id)
231+
if err == nil {
232+
c.metas[id] = m
233+
}
234+
return m, err
235+
}

pkg/compact/compact.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type Syncer struct {
5353
metrics *syncerMetrics
5454
acceptMalformedIndex bool
5555
relabelConfig []*relabel.Config
56+
metaDownloader block.MetaDownloader
5657
}
5758

5859
type syncerMetrics struct {
@@ -145,7 +146,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
145146

146147
// NewSyncer returns a new Syncer for the given Bucket and directory.
147148
// Blocks must be at least as old as the sync delay for being considered.
148-
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config) (*Syncer, error) {
149+
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config, md block.MetaDownloader) (*Syncer, error) {
149150
if logger == nil {
150151
logger = log.NewNopLogger()
151152
}
@@ -155,6 +156,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
155156
consistencyDelay: consistencyDelay,
156157
blocks: map[ulid.ULID]*metadata.Meta{},
157158
bkt: bkt,
159+
metaDownloader: md,
158160
metrics: newSyncerMetrics(reg),
159161
blockSyncConcurrency: blockSyncConcurrency,
160162
acceptMalformedIndex: acceptMalformedIndex,
@@ -291,7 +293,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
291293
func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) {
292294
level.Debug(c.logger).Log("msg", "download meta", "block", id)
293295

294-
meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
296+
meta, err := c.metaDownloader.DownloadMeta(ctx, c.logger, c.bkt, id)
295297
if err != nil {
296298
if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) {
297299
level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
@@ -356,7 +358,7 @@ func groupKey(res int64, lbls labels.Labels) string {
356358
}
357359

358360
// Groups returns the compaction groups for all blocks currently known to the syncer.
359-
// It creates all groups from the scratch on every call.
361+
// It creates all groups from scratch on every call.
360362
func (c *Syncer) Groups() (res []*Group, err error) {
361363
c.mtx.Lock()
362364
defer c.mtx.Unlock()

pkg/compact/compact_e2e_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
3737
defer cancel()
3838

3939
relabelConfig := make([]*relabel.Config, 0)
40-
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
40+
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta))
4141
testutil.Ok(t, err)
4242

4343
// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
@@ -140,7 +140,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
140140
}
141141

142142
// Do one initial synchronization with the bucket.
143-
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
143+
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta))
144144
testutil.Ok(t, err)
145145
testutil.Ok(t, sy.SyncMetas(ctx))
146146

@@ -209,7 +209,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
209209

210210
reg := prometheus.NewRegistry()
211211

212-
sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil)
212+
sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil, block.MetaDownloaderFn(block.DownloadMeta))
213213
testutil.Ok(t, err)
214214

215215
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil)
@@ -515,7 +515,7 @@ func TestSyncer_SyncMetasFilter_e2e(t *testing.T) {
515515
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
516516
defer cancel()
517517

518-
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
518+
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta))
519519
testutil.Ok(t, err)
520520

521521
var ids []ulid.ULID

pkg/compact/compact_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/thanos-io/thanos/pkg/block"
1011
"github.com/thanos-io/thanos/pkg/block/metadata"
1112

1213
"github.com/oklog/ulid"
@@ -79,7 +80,7 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {
7980

8081
bkt := inmem.NewBucket()
8182
relabelConfig := make([]*relabel.Config, 0)
82-
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, relabelConfig)
83+
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta))
8384
testutil.Ok(t, err)
8485

8586
// Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.

0 commit comments

Comments
 (0)