Skip to content

Commit 18c9974

Browse files
committed
block/compact: rework consistency check, make writers only write
- It's weird that on upload errors, we try to clean everything and only then write again. It's an extra operation we don't need since whether a block exists or not hinges on the existence of meta.json. We don't need to delete old, same files before trying to upload them again. - Consequently, we need to always use the _upload_ time, not block creation time when checking for consistency or when deleting partially uploaded blocks. Directories as such don't exist in object storages, it's a client-side "illusion", so we need to iterate through the partial block's directory to fetch the last modified date. Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
1 parent 1b21d71 commit 18c9974

File tree

7 files changed

+71
-33
lines changed

7 files changed

+71
-33
lines changed

pkg/block/block.go

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -141,39 +141,29 @@ func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
141141
return errors.Wrap(err, "gather meta file stats")
142142
}
143143

144-
if err := meta.Write(&metaEncoded); err != nil {
145-
return errors.Wrap(err, "encode meta file")
146-
}
147-
148144
if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname), options...); err != nil {
149-
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks"))
145+
return errors.Wrap(err, "upload chunks")
150146
}
151147

152148
if err := objstore.UploadFile(ctx, logger, bkt, filepath.Join(bdir, IndexFilename), path.Join(id.String(), IndexFilename)); err != nil {
153-
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index"))
149+
return errors.Wrap(err, "upload index")
150+
}
151+
152+
meta.Thanos.UploadTime = time.Now().UTC()
153+
if err := meta.Write(&metaEncoded); err != nil {
154+
return errors.Wrap(err, "encode meta file")
154155
}
155156

156157
// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file to be pending uploads.
157158
if err := bkt.Upload(ctx, path.Join(id.String(), MetaFilename), strings.NewReader(metaEncoded.String())); err != nil {
158-
// Don't call cleanUp here. Despite getting error, meta.json may have been uploaded in certain cases,
159-
// and even though cleanUp will not see it yet, meta.json may appear in the bucket later.
160-
// (Eg. S3 is known to behave this way when it returns 503 "SlowDown" error).
161-
// If meta.json is not uploaded, this will produce partial blocks, but such blocks will be cleaned later.
159+
// Syncer always checks if meta.json exists in the next iteration and will retry if it does not.
160+
// This is to avoid partial uploads.
162161
return errors.Wrap(err, "upload meta file")
163162
}
164163

165164
return nil
166165
}
167166

168-
func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) error {
169-
// Cleanup the dir with an uncancelable context.
170-
cleanErr := Delete(context.Background(), logger, bkt, id)
171-
if cleanErr != nil {
172-
return errors.Wrapf(err, "failed to clean block after upload issue. Partial block in system. Err: %s", cleanErr.Error())
173-
}
174-
return err
175-
}
176-
177167
// MarkForDeletion creates a file which stores information about when the block was marked for deletion.
178168
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, details string, markedForDeletion prometheus.Counter) error {
179169
deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)

pkg/block/fetcher.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,10 +1001,18 @@ func NewConsistencyDelayMetaFilterWithoutMetrics(logger log.Logger, consistencyD
10011001
// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
10021002
func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error {
10031003
for id, meta := range metas {
1004+
var metaUploadTime = meta.Thanos.UploadTime
1005+
1006+
var tooFresh bool
1007+
if !metaUploadTime.IsZero() {
1008+
tooFresh = time.Since(metaUploadTime) < f.consistencyDelay
1009+
} else {
1010+
tooFresh = ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond)
1011+
}
1012+
10041013
// TODO(khyatisoneji): Remove the checks about Thanos Source
10051014
// by implementing delete delay to fetch metas.
1006-
// TODO(bwplotka): Check consistency delay based on file upload / modification time instead of ULID.
1007-
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
1015+
if tooFresh &&
10081016
meta.Thanos.Source != metadata.BucketRepairSource &&
10091017
meta.Thanos.Source != metadata.CompactorSource &&
10101018
meta.Thanos.Source != metadata.CompactorRepairSource {

pkg/block/fetcher_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,6 +1069,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
10691069
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.SidecarSource}},
10701070
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.ReceiveSource}},
10711071
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.RulerSource}},
1072+
u.ULID(now): {Thanos: metadata.Thanos{UploadTime: time.Now().Add(-20 * time.Hour), Source: metadata.RulerSource}},
10721073
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.BucketRepairSource}},
10731074
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.CompactorSource}},
10741075
u.ULID(now.Add(-20 * time.Hour)): {Thanos: metadata.Thanos{Source: metadata.CompactorRepairSource}},

pkg/block/metadata/meta.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"io"
1515
"os"
1616
"path/filepath"
17+
"time"
1718

1819
"github.com/go-kit/log"
1920
"github.com/oklog/ulid/v2"
@@ -103,6 +104,10 @@ type Thanos struct {
103104

104105
// Extensions are used for plugin any arbitrary additional information for block. Optional.
105106
Extensions any `json:"extensions,omitempty"`
107+
108+
// UploadTime is used to track when the meta.json file was uploaded to the object storage
109+
// without an extra Attributes call. Used for consistency filter.
110+
UploadTime time.Time `json:"upload_time,omitempty"`
106111
}
107112

108113
type IndexStats struct {

pkg/compact/clean.go

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ package compact
55

66
import (
77
"context"
8+
"fmt"
89
"time"
910

1011
"github.com/go-kit/log"
1112
"github.com/go-kit/log/level"
1213
"github.com/oklog/ulid/v2"
1314

1415
"github.com/prometheus/client_golang/prometheus"
16+
"github.com/prometheus/prometheus/model/timestamp"
1517
"github.com/thanos-io/objstore"
1618

1719
"github.com/thanos-io/thanos/pkg/block"
@@ -23,6 +25,34 @@ const (
2325
PartialUploadThresholdAge = 2 * 24 * time.Hour
2426
)
2527

28+
// getOldestModifiedTime returns the oldest modified time of a block in the bucket.
29+
// If it is not possible to get the last modified timestamp then it falls back to the time
30+
// encoded in the block's ULID.
31+
func getOldestModifiedTime(ctx context.Context, blockID ulid.ULID, bkt objstore.Bucket) (time.Time, error) {
32+
var lastModifiedTime time.Time
33+
34+
err := bkt.IterWithAttributes(ctx, bkt.Name(), func(attrs objstore.IterObjectAttributes) error {
35+
lm, ok := attrs.LastModified()
36+
if !ok {
37+
return nil
38+
}
39+
if lm.After(lastModifiedTime) {
40+
lastModifiedTime = lm
41+
}
42+
return nil
43+
}, objstore.WithUpdatedAt(), objstore.WithRecursiveIter())
44+
45+
if err != nil {
46+
return timestamp.Time(int64(blockID.Time())), err
47+
}
48+
49+
if lastModifiedTime.IsZero() {
50+
return timestamp.Time(int64(blockID.Time())), fmt.Errorf("no last modified time found for block %s, using block creation time instead", blockID.String())
51+
}
52+
53+
return lastModifiedTime, nil
54+
}
55+
2656
func BestEffortCleanAbortedPartialUploads(
2757
ctx context.Context,
2858
logger log.Logger,
@@ -34,23 +64,17 @@ func BestEffortCleanAbortedPartialUploads(
3464
) {
3565
level.Info(logger).Log("msg", "started cleaning of aborted partial uploads")
3666

37-
// Delete partial blocks that are older than partialUploadThresholdAge.
38-
// TODO(bwplotka): This is can cause data loss if blocks are:
39-
// * being uploaded longer than partialUploadThresholdAge
40-
// * being uploaded and started after their partialUploadThresholdAge
41-
// can be assumed in this case. Keep partialUploadThresholdAge long for now.
42-
// Mitigate this by adding ModifiedTime to bkt and check that instead of ULID (block creation time).
4367
for id := range partial {
44-
if ulid.Now()-id.Time() <= uint64(PartialUploadThresholdAge/time.Millisecond) {
45-
// Minimum delay has not expired, ignore for now.
68+
lastModifiedTime, err := getOldestModifiedTime(ctx, id, bkt)
69+
if err != nil {
70+
level.Warn(logger).Log("msg", "failed to get last modified time for block; falling back to block creation time", "block", id, "err", err)
71+
}
72+
if time.Since(lastModifiedTime) <= PartialUploadThresholdAge {
4673
continue
4774
}
4875

4976
deleteAttempts.Inc()
50-
level.Info(logger).Log("msg", "found partially uploaded block; marking for deletion", "block", id)
51-
// We don't gather any information about deletion marks for partial blocks, so let's simply remove it. We waited
52-
// long PartialUploadThresholdAge already.
53-
// TODO(bwplotka): Fix some edge cases: https://github.com/thanos-io/thanos/issues/2470 .
77+
level.Info(logger).Log("msg", "found partially uploaded block; deleting", "block", id)
5478
if err := block.Delete(ctx, logger, bkt, id); err != nil {
5579
blockCleanupFailures.Inc()
5680
level.Warn(logger).Log("msg", "failed to delete aborted partial upload; will retry in next iteration", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err)

pkg/compact/clean_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,12 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
8787
testutil.Ok(t, err)
8888
testutil.Equals(t, true, exists)
8989
}
90+
91+
func TestGetLastModifiedTime(t *testing.T) {
92+
now := time.Now().UTC()
93+
u := ulid.MustNewDefault(now)
94+
tm, err := getOldestModifiedTime(context.Background(), u, objstore.NewInMemBucket())
95+
testutil.NotOk(t, err)
96+
// NOTE(GiedriusS): ULIDs use millisecond precision.
97+
testutil.Equals(t, now.Truncate(time.Second), tm.Truncate(time.Second))
98+
}

pkg/shipper/shipper_e2e_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {
154154
}
155155

156156
buf := bytes.Buffer{}
157+
meta.Thanos.UploadTime = time.Time{}
157158
testutil.Ok(t, meta.Write(&buf))
158159

159160
// We will delete the fifth block and do not expect it to be re-uploaded later.

0 commit comments

Comments
 (0)