Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
)

var blockTooFreshSentinelError = errors.New("Block too fresh")
var emptyCompactedBlockSentinelError = errors.New("Compaction would have created empty block")

// Syncer syncronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
Expand Down Expand Up @@ -779,6 +780,21 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err != nil {
return compID, halt(errors.Wrapf(err, "compact blocks %v", plan))
}
if compID == (ulid.ULID{}) {
// Prometheus compactor found that the compacted block would have no samples.
level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", fmt.Sprintf("%v", plan))
for _, block := range plan {
meta, err := metadata.Read(block)
if err != nil {
level.Warn(cg.logger).Log("msg", "failed to read meta for block", "block", block)
continue
}
if meta.Stats.NumSamples == 0 {
cg.deleteBlock(block)
}
}
return compID, emptyCompactedBlockSentinelError
}
level.Debug(cg.logger).Log("msg", "compacted blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))

Expand Down Expand Up @@ -818,29 +834,35 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, b := range plan {
id, err := ulid.Parse(filepath.Base(b))
if err != nil {
return compID, errors.Wrapf(err, "plan dir %s", b)
}

if err := os.RemoveAll(b); err != nil {
return compID, errors.Wrapf(err, "remove old block dir %s", id)
}

// Spawn a new context so we always delete a block in full on shutdown.
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id, "result_block", compID)
err = block.Delete(delCtx, cg.bkt, id)
cancel()
if err != nil {
return compID, retry(errors.Wrapf(err, "delete old block %s from bucket ", id))
if err := cg.deleteBlock(b); err != nil {
return compID, retry(errors.Wrapf(err, "delete old block from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
}

return compID, nil
}

func (cg *Group) deleteBlock(b string) error {
id, err := ulid.Parse(filepath.Base(b))
if err != nil {
return errors.Wrapf(err, "plan dir %s", b)
}

if err := os.RemoveAll(b); err != nil {
return errors.Wrapf(err, "remove old block dir %s", id)
}

// Spawn a new context so we always delete a block in full on shutdown.
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id)
if err := block.Delete(delCtx, cg.bkt, id); err != nil {
return errors.Wrapf(err, "delete block %s from bucket", id)
}
return nil
}

// BucketCompactor compacts blocks in a bucket.
type BucketCompactor struct {
logger log.Logger
Expand Down Expand Up @@ -882,6 +904,8 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
return errors.Wrap(err, "garbage")
}

level.Info(c.logger).Log("msg", "start of compaction")

groups, err := c.sy.Groups()
if err != nil {
return errors.Wrap(err, "build compaction groups")
Expand All @@ -898,6 +922,10 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
continue
}

if err == emptyCompactedBlockSentinelError {
continue
}

if IsIssue347Error(err) {
if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil {
done = false
Expand Down