Skip to content

Commit 9e281f6

Browse files
committed
receive: Added more observability, fixed leaktest, to actually check leaks ):
Reason: Missing (), probably we need linter for this. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
1 parent 789ef71 commit 9e281f6

File tree

4 files changed

+60
-47
lines changed

4 files changed

+60
-47
lines changed

cmd/thanos/receive.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
opentracing "github.com/opentracing/opentracing-go"
1919
"github.com/pkg/errors"
2020
"github.com/prometheus/client_golang/prometheus"
21+
"github.com/prometheus/client_golang/prometheus/promauto"
2122
"github.com/prometheus/prometheus/pkg/labels"
2223
"github.com/prometheus/prometheus/tsdb"
2324
kingpin "gopkg.in/alecthomas/kingpin.v2"
@@ -205,8 +206,7 @@ func runReceive(
205206
allowOutOfOrderUpload bool,
206207
) error {
207208
logger = log.With(logger, "component", "receive")
208-
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")
209-
209+
level.Warn(logger).Log("msg", "setting up receive")
210210
rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA)
211211
if err != nil {
212212
return err
@@ -285,38 +285,50 @@ func runReceive(
285285

286286
// dbReady signals when TSDB is ready and the Store gRPC server can start.
287287
dbReady := make(chan struct{}, 1)
288-
// updateDB signals when TSDB needs to be flushed and updated.
289-
updateDB := make(chan struct{}, 1)
288+
// hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change.
289+
hashringChangedChan := make(chan struct{}, 1)
290290
// uploadC signals when new blocks should be uploaded.
291291
uploadC := make(chan struct{}, 1)
292292
// uploadDone signals when uploading has finished.
293293
uploadDone := make(chan struct{}, 1)
294294

295295
level.Debug(logger).Log("msg", "setting up tsdb")
296296
{
297-
// TSDB.
297+
dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{
298+
Name: "thanos_receive_multi_db_updates_attempted_total",
299+
Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes",
300+
})
301+
dbUpdatesCompleted := promauto.With(reg).NewCounter(prometheus.CounterOpts{
302+
Name: "thanos_receive_multi_db_updates_completed_total",
303+
Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes",
304+
})
305+
306+
// TSDBs reload logic, listening on hashring changes.
298307
cancel := make(chan struct{})
299308
g.Add(func() error {
300309
defer close(dbReady)
301310
defer close(uploadC)
302311

303-
// Before quitting, ensure the WAL is flushed and the DB is closed.
312+
// Before quitting, ensure the WAL is flushed and the DBs are closed.
304313
defer func() {
305314
if err := dbs.Flush(); err != nil {
306315
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
307316
}
317+
if err := dbs.Close(); err != nil {
318+
level.Warn(logger).Log("err", err, "msg", "failed to close multi db")
319+
}
308320
}()
309321

310322
for {
311323
select {
312324
case <-cancel:
313325
return nil
314-
case _, ok := <-updateDB:
326+
case _, ok := <-hashringChangedChan:
315327
if !ok {
316328
return nil
317329
}
318-
319-
level.Info(logger).Log("msg", "updating DB")
330+
dbUpdatesStarted.Inc()
331+
level.Info(logger).Log("msg", "updating Multi DB")
320332

321333
if err := dbs.Flush(); err != nil {
322334
return errors.Wrap(err, "flushing storage")
@@ -330,6 +342,7 @@ func runReceive(
330342
}
331343
statusProber.Ready()
332344
level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests")
345+
dbUpdatesCompleted.Inc()
333346
dbReady <- struct{}{}
334347
}
335348
}
@@ -373,7 +386,7 @@ func runReceive(
373386

374387
cancel := make(chan struct{})
375388
g.Add(func() error {
376-
defer close(updateDB)
389+
defer close(hashringChangedChan)
377390
for {
378391
select {
379392
case h, ok := <-updates:
@@ -384,7 +397,7 @@ func runReceive(
384397
msg := "hashring has changed; server is not ready to receive web requests."
385398
statusProber.NotReady(errors.New(msg))
386399
level.Info(logger).Log("msg", msg)
387-
updateDB <- struct{}{}
400+
hashringChangedChan <- struct{}{}
388401
case <-cancel:
389402
return nil
390403
}

pkg/receive/handler_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64)
183183
}
184184

185185
func TestReceiveQuorum(t *testing.T) {
186-
defer leaktest.CheckTimeout(t, 10*time.Second)
187-
186+
defer leaktest.CheckTimeout(t, 10*time.Second)()
188187
appenderErrFn := func() error { return errors.New("failed to get appender") }
189188
conflictErrFn := func() error { return storage.ErrOutOfBounds }
190189
commitErrFn := func() error { return errors.New("failed to commit") }
@@ -521,8 +520,7 @@ func TestReceiveQuorum(t *testing.T) {
521520
}
522521

523522
func TestReceiveWithConsistencyDelay(t *testing.T) {
524-
defer leaktest.CheckTimeout(t, 10*time.Second)
525-
523+
defer leaktest.CheckTimeout(t, 10*time.Second)()
526524
appenderErrFn := func() error { return errors.New("failed to get appender") }
527525
conflictErrFn := func() error { return storage.ErrOutOfBounds }
528526
commitErrFn := func() error { return errors.New("failed to commit") }

pkg/receive/multitsdb.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ func NewMultiTSDB(
7070

7171
type tenant struct {
7272
readyS *ReadyStorage
73-
tsdb *tsdb.DB
7473
storeTSDB *store.TSDBStore
7574
ship *shipper.Shipper
7675

@@ -100,16 +99,9 @@ func (t *tenant) shipper() *shipper.Shipper {
10099
return t.ship
101100
}
102101

103-
func (t *tenant) db() *tsdb.DB {
104-
t.mtx.RLock()
105-
defer t.mtx.RUnlock()
106-
return t.tsdb
107-
}
108-
109102
func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper) {
110103
t.readyS.Set(tenantTSDB)
111104
t.mtx.Lock()
112-
t.tsdb = tenantTSDB
113105
t.storeTSDB = storeTSDB
114106
t.ship = ship
115107
t.mtx.Unlock()
@@ -148,17 +140,17 @@ func (t *MultiTSDB) Flush() error {
148140
errmtx := &sync.Mutex{}
149141
merr := terrors.MultiError{}
150142
wg := &sync.WaitGroup{}
151-
for _, tenant := range t.tenants {
152-
db := tenant.db()
143+
for id, tenant := range t.tenants {
144+
db := tenant.readyStorage().Get()
153145
if db == nil {
146+
level.Error(t.logger).Log("msg", "flushing TSDB failed; not ready", "tenant", id)
154147
continue
155148
}
156-
149+
level.Info(t.logger).Log("msg", "flushing TSDB", "tenant", id)
157150
wg.Add(1)
158151
go func() {
159152
head := db.Head()
160-
mint, maxt := head.MinTime(), head.MaxTime()
161-
if err := db.CompactHead(tsdb.NewRangeHead(head, mint, maxt-1)); err != nil {
153+
if err := db.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime()-1)); err != nil {
162154
errmtx.Lock()
163155
merr.Add(err)
164156
errmtx.Unlock()
@@ -171,7 +163,28 @@ func (t *MultiTSDB) Flush() error {
171163
return merr.Err()
172164
}
173165

166+
func (t *MultiTSDB) Close() error {
167+
t.mtx.Lock()
168+
defer t.mtx.Unlock()
169+
170+
merr := terrors.MultiError{}
171+
for id, tenant := range t.tenants {
172+
db := tenant.readyStorage().Get()
173+
if db == nil {
174+
level.Error(t.logger).Log("msg", "closing TSDB failed; not ready", "tenant", id)
175+
continue
176+
}
177+
level.Info(t.logger).Log("msg", "closing TSDB", "tenant", id)
178+
merr.Add(db.Close())
179+
}
180+
return merr.Err()
181+
}
182+
174183
func (t *MultiTSDB) Sync(ctx context.Context) error {
184+
if t.bucket == nil {
185+
return errors.New("bucket is not specified, Sync should not be invoked")
186+
}
187+
175188
t.mtx.RLock()
176189
defer t.mtx.RUnlock()
177190

@@ -184,7 +197,6 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
184197
if s == nil {
185198
continue
186199
}
187-
188200
wg.Add(1)
189201
go func() {
190202
if uploaded, err := s.Sync(ctx); err != nil {
@@ -195,7 +207,6 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
195207
wg.Done()
196208
}()
197209
}
198-
199210
wg.Wait()
200211
return merr.Err()
201212
}
@@ -219,6 +230,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
219230
lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID})
220231
dataDir := path.Join(t.dataDir, tenantID)
221232

233+
level.Info(logger).Log("msg", "opening TSDB")
222234
opts := *t.tsdbOpts
223235
s, err := tsdb.Open(
224236
dataDir,
@@ -232,7 +244,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
232244
t.mtx.Unlock()
233245
return err
234246
}
235-
236247
var ship *shipper.Shipper
237248
if t.bucket != nil {
238249
ship = shipper.New(
@@ -245,16 +256,11 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
245256
t.allowOutOfOrderUpload,
246257
)
247258
}
248-
tenant.set(store.NewTSDBStore(
249-
logger,
250-
reg,
251-
s,
252-
component.Receive,
253-
lbls,
254-
), s, ship)
255-
259+
tenant.set(store.NewTSDBStore(logger, reg, s, component.Receive, lbls), s, ship)
260+
level.Info(logger).Log("msg", "TSDB is now ready")
256261
return nil
257262
}
263+
258264
func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenant, error) {
259265
// Fast path, as creating tenants is a very rare operation.
260266
t.mtx.RLock()

pkg/receive/multitsdb_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package receive
55

66
import (
77
"context"
8-
"fmt"
98
"io/ioutil"
109
"os"
1110
"testing"
@@ -25,13 +24,12 @@ import (
2524
)
2625

2726
func TestMultiTSDB(t *testing.T) {
28-
defer leaktest.CheckTimeout(t, 10*time.Second)
29-
27+
defer leaktest.CheckTimeout(t, 10*time.Second)()
3028
dir, err := ioutil.TempDir("", "test")
3129
testutil.Ok(t, err)
3230
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
3331

34-
logger := log.NewNopLogger()
32+
logger := log.NewLogfmtLogger(os.Stderr)
3533
t.Run("run fresh", func(t *testing.T) {
3634
m := NewMultiTSDB(
3735
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
@@ -45,7 +43,7 @@ func TestMultiTSDB(t *testing.T) {
4543
nil,
4644
false,
4745
)
48-
defer testutil.Ok(t, m.Flush())
46+
defer func() { testutil.Ok(t, m.Close()) }()
4947

5048
testutil.Ok(t, m.Flush())
5149
testutil.Ok(t, m.Open())
@@ -112,7 +110,7 @@ func TestMultiTSDB(t *testing.T) {
112110
nil,
113111
false,
114112
)
115-
defer testutil.Ok(t, m.Flush())
113+
defer func() { testutil.Ok(t, m.Close()) }()
116114

117115
testutil.Ok(t, m.Flush())
118116
testutil.Ok(t, m.Open())
@@ -202,13 +200,11 @@ Outer:
202200
if !ok {
203201
break Outer
204202
}
205-
fmt.Println(r[0].String())
206203
testutil.Equals(t, expectedFooResp, r)
207204
case r, ok := <-respBar:
208205
if !ok {
209206
break Outer
210207
}
211-
fmt.Println(r[0].String())
212208
testutil.Equals(t, expectedBarResp, r)
213209
}
214210
}

0 commit comments

Comments
 (0)