Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page
- [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs.
- [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails.

## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10

Expand Down
94 changes: 55 additions & 39 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
Expand Down Expand Up @@ -82,7 +82,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()

forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden())
forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())
Expand Down Expand Up @@ -163,8 +163,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
*replicaHeader,
*replicationFactor,
time.Duration(*forwardTimeout),
comp,
*allowOutOfOrderUpload,
comp,
)
}
}
Expand Down Expand Up @@ -202,8 +202,8 @@ func runReceive(
replicaHeader string,
replicationFactor uint64,
forwardTimeout time.Duration,
comp component.SourceStoreAPI,
allowOutOfOrderUpload bool,
comp component.SourceStoreAPI,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive")
Expand Down Expand Up @@ -294,6 +294,7 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up tsdb")
{
log.With(logger, "component", "storage")
dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_receive_multi_db_updates_attempted_total",
Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes",
Expand All @@ -311,12 +312,17 @@ func runReceive(

// Before quitting, ensure the WAL is flushed and the DBs are closed.
defer func() {
level.Info(logger).Log("msg", "shutting down storage")
if err := dbs.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
} else {
level.Info(logger).Log("msg", "storage is flushed successfully")
}
if err := dbs.Close(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to close multi db")
level.Error(logger).Log("err", err, "msg", "failed to close storage")
return
}
level.Info(logger).Log("msg", "storage is closed")
}()

for {
Expand All @@ -328,7 +334,7 @@ func runReceive(
return nil
}
dbUpdatesStarted.Inc()
level.Info(logger).Log("msg", "updating Multi DB")
level.Info(logger).Log("msg", "updating storage")

if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
Expand All @@ -341,7 +347,7 @@ func runReceive(
<-uploadDone
}
statusProber.Ready()
level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests")
level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests")
dbUpdatesCompleted.Inc()
dbReady <- struct{}{}
}
Expand Down Expand Up @@ -394,7 +400,7 @@ func runReceive(
return nil
}
webHandler.Hashring(h)
msg := "hashring has changed; server is not ready to receive web requests."
msg := "hashring has changed; server is not ready to receive web requests"
statusProber.NotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
hashringChangedChan <- struct{}{}
Expand Down Expand Up @@ -489,57 +495,67 @@ func runReceive(
}

if upload {
level.Debug(logger).Log("msg", "upload enabled")
if err := dbs.Sync(context.Background()); err != nil {
level.Warn(logger).Log("msg", "initial upload failed", "err", err)
}
logger := log.With(logger, "component", "uploader")
upload := func(ctx context.Context) error {
level.Debug(logger).Log("msg", "upload starting")
start := time.Now()

if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("msg", "upload failed", "elapsed", time.Since(start), "err", err)
return err
}
level.Debug(logger).Log("msg", "upload done", "elapsed", time.Since(start))
return nil
}
{
// Run the uploader in a loop.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("msg", "interval upload failed", "err", err)
}

return nil
})
}, func(error) {
cancel()
})
level.Info(logger).Log("msg", "upload enabled, starting initial sync")
if err := upload(context.Background()); err != nil {
return errors.Wrap(err, "initial upload failed")
}
level.Info(logger).Log("msg", "initial sync done")
}

{
// Upload on demand.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Ensure we clean up everything properly.
defer func() {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}()

// Before quitting, ensure all blocks are uploaded.
defer func() {
<-uploadC
if err := dbs.Sync(context.Background()); err != nil {
level.Warn(logger).Log("msg", "on demnad upload failed", "err", err)
<-uploadC // Closed by storage routine when it's done.
level.Info(logger).Log("msg", "uploading the final cut block before exiting")
ctx, cancel := context.WithCancel(context.Background())
if err := dbs.Sync(ctx); err != nil {
cancel()
level.Error(logger).Log("msg", "the final upload failed", "err", err)
return
}
cancel()
level.Info(logger).Log("msg", "the final cut block was uploaded")
}()

defer close(uploadDone)

// Run the uploader in a loop.
tick := time.NewTicker(30 * time.Second)
defer tick.Stop()

for {
select {
case <-ctx.Done():
return nil
default:
}
select {
case <-ctx.Done():
return nil
case <-uploadC:
if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err)
// Upload on demand.
if err := upload(ctx); err != nil {
level.Warn(logger).Log("msg", "on demand upload failed", "err", err)
}
uploadDone <- struct{}{}
case <-tick.C:
if err := upload(ctx); err != nil {
level.Warn(logger).Log("msg", "recurring upload failed", "err", err)
}
}
}
}, func(error) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
)

type MultiTSDB struct {
Expand Down Expand Up @@ -57,7 +58,7 @@ func NewMultiTSDB(

return &MultiTSDB{
dataDir: dataDir,
logger: l,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
Expand Down
16 changes: 10 additions & 6 deletions pkg/server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
grpc_health "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)

// A Server defines parameters to serve RPC requests, a wrapper around grpc.Server.
Expand Down Expand Up @@ -123,10 +124,11 @@ func (s *Server) ListenAndServe() error {
// Shutdown gracefully shuts down the server by waiting,
// for specified amount of time (by gracePeriod) for connections to return to idle and then shut down.
func (s *Server) Shutdown(err error) {
defer level.Info(s.logger).Log("msg", "internal server shutdown", "err", err)
level.Info(s.logger).Log("msg", "internal server is shutting down", "err", err)

if s.opts.gracePeriod == 0 {
s.srv.Stop()
level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err)
return
}

Expand All @@ -144,9 +146,11 @@ func (s *Server) Shutdown(err error) {
case <-ctx.Done():
level.Info(s.logger).Log("msg", "grace period exceeded enforcing shutdown")
s.srv.Stop()
return
case <-stopped:
cancel()
}
level.Info(s.logger).Log("msg", "internal server is shutdown gracefully", "err", err)
}

// ReadWriteStoreServer is a StoreServer and a WriteableStoreServer.
Expand Down
7 changes: 5 additions & 2 deletions pkg/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"
)
Expand Down Expand Up @@ -60,15 +61,15 @@ func (s *Server) ListenAndServe() error {
// Shutdown gracefully shuts down the server by waiting,
// for specified amount of time (by gracePeriod) for connections to return to idle and then shut down.
func (s *Server) Shutdown(err error) {
level.Info(s.logger).Log("msg", "internal server is shutting down", "err", err)
if err == http.ErrServerClosed {
level.Warn(s.logger).Log("msg", "internal server closed unexpectedly")
return
}

defer level.Info(s.logger).Log("msg", "internal server shutdown", "err", err)

if s.opts.gracePeriod == 0 {
s.srv.Close()
level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err)
return
}

Expand All @@ -77,7 +78,9 @@ func (s *Server) Shutdown(err error) {

if err := s.srv.Shutdown(ctx); err != nil {
level.Error(s.logger).Log("msg", "internal server shut down failed", "err", err)
return
}
level.Info(s.logger).Log("msg", "internal server is shutdown gracefully", "err", err)
}

// Handle registers the handler for the given pattern.
Expand Down