Skip to content

Commit bab4e1b

Browse files
committed
Add timeout for all uploads
Merge uploader routines Address review issues Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
1 parent 642b609 commit bab4e1b

File tree

2 files changed

+40
-35
lines changed

2 files changed

+40
-35
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
2222

2323
- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page
2424
- [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs.
25+
- [#2892](https://github.com/thanos-io/thanos/pull/2892) receive: Add time-out for each block upload. And receiver fails when the initial upload fails.
2526

2627
## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10
2728

cmd/thanos/receive.go

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
8282

8383
replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()
8484

85-
forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden())
85+
forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())
8686

87-
uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for the initial and last upload request.").Default("10m").Hidden())
87+
uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for each block upload request.").Default("10m").Hidden())
8888

8989
tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
9090
tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())
@@ -316,15 +316,17 @@ func runReceive(
316316

317317
// Before quitting, ensure the WAL is flushed and the DBs are closed.
318318
defer func() {
319-
level.Info(logger).Log("msg", "shutting down Multi TSDB")
319+
level.Info(logger).Log("msg", "shutting down storage")
320320
if err := dbs.Flush(); err != nil {
321321
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
322+
} else {
323+
level.Info(logger).Log("msg", "storage is flushed successfully")
322324
}
323325
if err := dbs.Close(); err != nil {
324326
level.Error(logger).Log("err", err, "msg", "failed to close storage")
325327
return
326328
}
327-
level.Info(logger).Log("msg", "Multi TSDB is closed")
329+
level.Info(logger).Log("msg", "storage is closed")
328330
}()
329331

330332
for {
@@ -336,7 +338,7 @@ func runReceive(
336338
return nil
337339
}
338340
dbUpdatesStarted.Inc()
339-
level.Info(logger).Log("msg", "updating Multi TSDB")
341+
level.Info(logger).Log("msg", "updating storage")
340342

341343
if err := dbs.Flush(); err != nil {
342344
return errors.Wrap(err, "flushing storage")
@@ -503,64 +505,66 @@ func runReceive(
503505
ctx, cancel := context.WithTimeout(context.Background(), uploadTimeout)
504506
if err := dbs.Sync(ctx); err != nil {
505507
cancel()
506-
level.Warn(logger).Log("msg", "initial upload failed", "err", err)
508+
return errors.Wrapf(err, "initial upload failed")
507509
}
508510
cancel()
509511
level.Info(logger).Log("msg", "initial sync done")
510512
}
511513
{
512-
// Run the uploader in a loop.
513-
ctx, cancel := context.WithCancel(context.Background())
514-
g.Add(func() error {
515-
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
516-
level.Debug(logger).Log("msg", "recurring upload starting")
517-
if err := dbs.Sync(ctx); err != nil {
518-
level.Warn(logger).Log("msg", "recurring upload failed", "err", err)
519-
}
520-
level.Debug(logger).Log("msg", "upload done")
521-
return nil
522-
})
523-
}, func(error) {
524-
cancel()
525-
})
526-
}
527-
528-
{
529-
// Upload on demand.
530514
ctx, cancel := context.WithCancel(context.Background())
531515
g.Add(func() error {
532516
// Ensure we clean up everything properly.
533517
defer func() {
534518
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
535519
}()
520+
536521
// Before quitting, ensure all blocks are uploaded.
537522
defer func() {
538-
<-uploadC
539-
level.Info(logger).Log("msg", "uploading the last cut block before exiting")
523+
<-uploadC // Closed by storage routine when it's done.
524+
level.Info(logger).Log("msg", "uploading the final cut block before exiting")
540525
dctx, dCancel := context.WithTimeout(context.Background(), uploadTimeout)
541526
if err := dbs.Sync(dctx); err != nil {
542527
dCancel()
543-
level.Error(logger).Log("msg", "on demand upload failed", "err", err)
528+
level.Error(logger).Log("msg", "the final upload failed", "err", err)
544529
return
545530
}
546531
dCancel()
547-
level.Info(logger).Log("msg", "the last cut block is uploaded")
532+
level.Info(logger).Log("msg", "the final cut block was uploaded")
548533
}()
534+
549535
defer close(uploadDone)
550-
for {
551-
select {
552-
case <-ctx.Done():
553-
return nil
554-
default:
536+
537+
upload := func(ctx context.Context) error {
538+
level.Debug(logger).Log("msg", "upload starting")
539+
ctx, cancel := context.WithTimeout(ctx, uploadTimeout)
540+
defer cancel()
541+
542+
if err := dbs.Sync(ctx); err != nil {
543+
level.Warn(logger).Log("msg", "upload failed", "err", err)
544+
} else {
545+
level.Debug(logger).Log("msg", "upload done")
555546
}
547+
return nil
548+
}
549+
550+
// Run the uploader in a loop.
551+
tick := time.NewTicker(30 * time.Second)
552+
defer tick.Stop()
553+
554+
for {
556555
select {
557556
case <-ctx.Done():
558557
return nil
559558
case <-uploadC:
560-
if err := dbs.Sync(ctx); err != nil {
561-
level.Warn(logger).Log("err", err)
559+
// Upload on demand.
560+
if err := upload(ctx); err != nil {
561+
level.Warn(logger).Log("msg", "on demand upload failed", "err", err)
562562
}
563563
uploadDone <- struct{}{}
564+
case <-tick.C:
565+
if err := upload(ctx); err != nil {
566+
level.Warn(logger).Log("msg", "recurring upload failed", "err", err)
567+
}
564568
}
565569
}
566570
}, func(error) {

0 commit comments

Comments
 (0)