Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func runCompact(
component,
)
api := blocksAPI.NewBlocksAPI(logger, conf.label, flagsMap)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, bkt)
var sy *compact.Syncer
{
// Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability.
Expand All @@ -229,6 +230,7 @@ func runCompact(
block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
noCompactMarkerFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels)},
)
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
Expand Down Expand Up @@ -280,7 +282,7 @@ func runCompact(

grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewTSDBBasedPlanner(logger, levels), comp, compactDir, bkt, conf.compactionConcurrency)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewPlanner(logger, levels, noCompactMarkerFilter), comp, compactDir, bkt, conf.compactionConcurrency)
if err != nil {
cancel()
return errors.Wrap(err, "create bucket compactor")
Expand Down
27 changes: 16 additions & 11 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ const (
// but don't have a replacement block yet.
markedForDeletionMeta = "marked-for-deletion"

// MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric.
MarkedForNoCompactionMeta = "marked-for-no-compact"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
)
Expand Down Expand Up @@ -111,6 +114,7 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
[]string{timeExcludedMeta},
[]string{duplicateMeta},
[]string{markedForDeletionMeta},
[]string{MarkedForNoCompactionMeta},
)
m.modified = extprom.NewTxGaugeVec(
reg,
Expand Down Expand Up @@ -782,19 +786,20 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark)

for id := range metas {
deletionMark, err := metadata.ReadDeletionMark(ctx, f.bkt, f.logger, id.String())
if err == metadata.ErrorDeletionMarkNotFound {
continue
}
if errors.Cause(err) == metadata.ErrorUnmarshalDeletionMark {
level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err)
continue
}
if err != nil {
m := &metadata.DeletionMark{}
if err := metadata.ReadMarker(ctx, f.logger, f.bkt, id.String(), m); err != nil {
if errors.Cause(err) == metadata.ErrorMarkerNotFound {
continue
}
if errors.Cause(err) == metadata.ErrorUnmarshalMarker {
level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err)
continue
}
return err
}
f.deletionMarkMap[id] = deletionMark
if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > f.delay.Seconds() {

f.deletionMarkMap[id] = m
if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(markedForDeletionMeta).Inc()
delete(metas, id)
}
Expand Down
76 changes: 0 additions & 76 deletions pkg/block/metadata/deletionmark.go

This file was deleted.

81 changes: 0 additions & 81 deletions pkg/block/metadata/deletionmark_test.go

This file was deleted.

115 changes: 115 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package metadata

import (
"context"
"encoding/json"
"io/ioutil"
"path"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
)

const (
// DeletionMarkFilename is the known json filename for optional file storing details about when block is marked for deletion.
// If such file is present in block dir, it means the block is meant to be deleted after certain delay.
DeletionMarkFilename = "deletion-mark.json"
// NoCompactMarkFilename is the known json filename for optional file storing details about why block has to be excluded from compaction.
// If such file is present in block dir, it means the block has to excluded from compaction (both vertical and horizontal) or rewrite (e.g deletions).
NoCompactMarkFilename = "no-compact-mark.json"

// DeletionMarkVersion1 is the version of deletion-mark file supported by Thanos.
DeletionMarkVersion1 = 1
// NoCompactMarkVersion1 is the version of no-compact-mark file supported by Thanos.
NoCompactMarkVersion1 = 1
)

var (
// ErrorMarkerNotFound is the error when marker file is not found.
ErrorMarkerNotFound = errors.New("marker not found")
// ErrorUnmarshalMarker is the error when unmarshalling marker JSON file.
// This error can occur because marker has been partially uploaded to block storage
// or the marker file is not a valid json file.
ErrorUnmarshalMarker = errors.New("unmarshal marker JSON")
)

type Marker interface {
markerFilename() string
}

// DeletionMark stores block id and when block was marked for deletion.
type DeletionMark struct {
// ID of the tsdb block.
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`

// DeletionTime is a unix timestamp of when the block was marked to be deleted.
DeletionTime int64 `json:"deletion_time"`
}

func (m *DeletionMark) markerFilename() string { return DeletionMarkFilename }

// NoCompactReason is a reason for a block to be excluded from compaction.
type NoCompactReason string

const (
// ManualNoCompactReason is a custom reason of excluding from compaction that should be added when no-compact mark is added for unknown/user specified reason.
ManualNoCompactReason NoCompactReason = "manual"
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
// This reason can be ignored when vertical block sharding will be implemented.
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
type NoCompactMark struct {
// ID of the tsdb block.
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`

Reason NoCompactReason `json:"reason"`
// Details is a human readable string giving details of reason.
Details string `json:"details"`
}

func (n *NoCompactMark) markerFilename() string { return NoCompactMarkFilename }

// ReadMarker reads the given mark file from <dir>/<marker filename>.json in bucket.
func ReadMarker(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, marker Marker) error {
markerFile := path.Join(dir, marker.markerFilename())
r, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, markerFile)
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return ErrorMarkerNotFound
}
return errors.Wrapf(err, "get file: %s", markerFile)
}
defer runutil.CloseWithLogOnErr(logger, r, "close bkt marker reader")

metaContent, err := ioutil.ReadAll(r)
if err != nil {
return errors.Wrapf(err, "read file: %s", markerFile)
}

if err := json.Unmarshal(metaContent, marker); err != nil {
return errors.Wrapf(ErrorUnmarshalMarker, "file: %s; err: %v", markerFile, err.Error())
}
switch marker.markerFilename() {
case NoCompactMarkFilename:
if version := marker.(*NoCompactMark).Version; version != NoCompactMarkVersion1 {
return errors.Errorf("unexpected no-compact-mark file version %d, expected %d", version, NoCompactMarkVersion1)
}
case DeletionMarkFilename:
if version := marker.(*DeletionMark).Version; version != DeletionMarkVersion1 {
return errors.Errorf("unexpected deletion-mark file version %d, expected %d", version, DeletionMarkVersion1)
}
}
return nil
}
Loading