Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"
Expand Down Expand Up @@ -205,8 +206,7 @@ func runReceive(
allowOutOfOrderUpload bool,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

level.Warn(logger).Log("msg", "setting up receive")
rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA)
if err != nil {
return err
Expand Down Expand Up @@ -285,38 +285,50 @@ func runReceive(

// dbReady signals when TSDB is ready and the Store gRPC server can start.
dbReady := make(chan struct{}, 1)
// updateDB signals when TSDB needs to be flushed and updated.
updateDB := make(chan struct{}, 1)
// hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change.
hashringChangedChan := make(chan struct{}, 1)
// uploadC signals when new blocks should be uploaded.
uploadC := make(chan struct{}, 1)
// uploadDone signals when uploading has finished.
uploadDone := make(chan struct{}, 1)

level.Debug(logger).Log("msg", "setting up tsdb")
{
// TSDB.
dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_receive_multi_db_updates_attempted_total",
Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes",
})
dbUpdatesCompleted := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_receive_multi_db_updates_completed_total",
Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes",
})

// TSDBs reload logic, listening on hashring changes.
cancel := make(chan struct{})
g.Add(func() error {
defer close(dbReady)
defer close(uploadC)

// Before quitting, ensure the WAL is flushed and the DB is closed.
// Before quitting, ensure the WAL is flushed and the DBs are closed.
defer func() {
if err := dbs.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
}
if err := dbs.Close(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to close multi db")
}
}()

for {
select {
case <-cancel:
return nil
case _, ok := <-updateDB:
case _, ok := <-hashringChangedChan:
if !ok {
return nil
}

level.Info(logger).Log("msg", "updating DB")
dbUpdatesStarted.Inc()
level.Info(logger).Log("msg", "updating Multi DB")

if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
Expand All @@ -330,6 +342,7 @@ func runReceive(
}
statusProber.Ready()
level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests")
dbUpdatesCompleted.Inc()
dbReady <- struct{}{}
}
}
Expand Down Expand Up @@ -373,7 +386,7 @@ func runReceive(

cancel := make(chan struct{})
g.Add(func() error {
defer close(updateDB)
defer close(hashringChangedChan)
for {
select {
case h, ok := <-updates:
Expand All @@ -384,7 +397,7 @@ func runReceive(
msg := "hashring has changed; server is not ready to receive web requests."
statusProber.NotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
updateDB <- struct{}{}
hashringChangedChan <- struct{}{}
case <-cancel:
return nil
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64)
}

func TestReceiveQuorum(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)

defer leaktest.CheckTimeout(t, 10*time.Second)()
appenderErrFn := func() error { return errors.New("failed to get appender") }
conflictErrFn := func() error { return storage.ErrOutOfBounds }
commitErrFn := func() error { return errors.New("failed to commit") }
Expand Down Expand Up @@ -521,8 +520,7 @@ func TestReceiveQuorum(t *testing.T) {
}

func TestReceiveWithConsistencyDelay(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)

defer leaktest.CheckTimeout(t, 10*time.Second)()
appenderErrFn := func() error { return errors.New("failed to get appender") }
conflictErrFn := func() error { return storage.ErrOutOfBounds }
commitErrFn := func() error { return errors.New("failed to commit") }
Expand Down
54 changes: 30 additions & 24 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func NewMultiTSDB(

type tenant struct {
readyS *ReadyStorage
tsdb *tsdb.DB
storeTSDB *store.TSDBStore
ship *shipper.Shipper

Expand Down Expand Up @@ -100,16 +99,9 @@ func (t *tenant) shipper() *shipper.Shipper {
return t.ship
}

func (t *tenant) db() *tsdb.DB {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.tsdb
}

func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper) {
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
t.tsdb = tenantTSDB
t.storeTSDB = storeTSDB
t.ship = ship
t.mtx.Unlock()
Expand Down Expand Up @@ -148,17 +140,17 @@ func (t *MultiTSDB) Flush() error {
errmtx := &sync.Mutex{}
merr := terrors.MultiError{}
wg := &sync.WaitGroup{}
for _, tenant := range t.tenants {
db := tenant.db()
for id, tenant := range t.tenants {
db := tenant.readyStorage().Get()
if db == nil {
level.Error(t.logger).Log("msg", "flushing TSDB failed; not ready", "tenant", id)
continue
}

level.Info(t.logger).Log("msg", "flushing TSDB", "tenant", id)
wg.Add(1)
go func() {
head := db.Head()
mint, maxt := head.MinTime(), head.MaxTime()
if err := db.CompactHead(tsdb.NewRangeHead(head, mint, maxt-1)); err != nil {
if err := db.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime()-1)); err != nil {
errmtx.Lock()
merr.Add(err)
errmtx.Unlock()
Expand All @@ -171,7 +163,28 @@ func (t *MultiTSDB) Flush() error {
return merr.Err()
}

func (t *MultiTSDB) Close() error {
t.mtx.Lock()
defer t.mtx.Unlock()

merr := terrors.MultiError{}
for id, tenant := range t.tenants {
db := tenant.readyStorage().Get()
if db == nil {
level.Error(t.logger).Log("msg", "closing TSDB failed; not ready", "tenant", id)
continue
}
level.Info(t.logger).Log("msg", "closing TSDB", "tenant", id)
merr.Add(db.Close())
}
return merr.Err()
}

func (t *MultiTSDB) Sync(ctx context.Context) error {
if t.bucket == nil {
return errors.New("bucket is not specified, Sync should not be invoked")
}

t.mtx.RLock()
defer t.mtx.RUnlock()

Expand All @@ -184,7 +197,6 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
if s == nil {
continue
}

wg.Add(1)
go func() {
if uploaded, err := s.Sync(ctx); err != nil {
Expand All @@ -195,7 +207,6 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
wg.Done()
}()
}

wg.Wait()
return merr.Err()
}
Expand All @@ -219,6 +230,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID})
dataDir := path.Join(t.dataDir, tenantID)

level.Info(logger).Log("msg", "opening TSDB")
opts := *t.tsdbOpts
s, err := tsdb.Open(
dataDir,
Expand All @@ -232,7 +244,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.mtx.Unlock()
return err
}

var ship *shipper.Shipper
if t.bucket != nil {
ship = shipper.New(
Expand All @@ -245,16 +256,11 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.allowOutOfOrderUpload,
)
}
tenant.set(store.NewTSDBStore(
logger,
reg,
s,
component.Receive,
lbls,
), s, ship)

tenant.set(store.NewTSDBStore(logger, reg, s, component.Receive, lbls), s, ship)
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}

func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenant, error) {
// Fast path, as creating tenants is a very rare operation.
t.mtx.RLock()
Expand Down
12 changes: 4 additions & 8 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package receive

import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
Expand All @@ -25,13 +24,12 @@ import (
)

func TestMultiTSDB(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)

defer leaktest.CheckTimeout(t, 10*time.Second)()
dir, err := ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

logger := log.NewNopLogger()
logger := log.NewLogfmtLogger(os.Stderr)
t.Run("run fresh", func(t *testing.T) {
m := NewMultiTSDB(
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
Expand All @@ -45,7 +43,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
)
defer testutil.Ok(t, m.Flush())
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, m.Flush())
testutil.Ok(t, m.Open())
Expand Down Expand Up @@ -112,7 +110,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
)
defer testutil.Ok(t, m.Flush())
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, m.Flush())
testutil.Ok(t, m.Open())
Expand Down Expand Up @@ -202,13 +200,11 @@ Outer:
if !ok {
break Outer
}
fmt.Println(r[0].String())
testutil.Equals(t, expectedFooResp, r)
case r, ok := <-respBar:
if !ok {
break Outer
}
fmt.Println(r[0].String())
testutil.Equals(t, expectedBarResp, r)
}
}
Expand Down