Skip to content

Commit 11f34d3

Browse files
committed
receiver: avoid race of hashring
Signed-off-by: YaoZengzeng <yaozengzeng@zju.edu.cn>
1 parent 4cf32d0 commit 11f34d3

File tree

1 file changed

+33
-10
lines changed

1 file changed

+33
-10
lines changed

pkg/receive/handler.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net"
1010
"net/http"
1111
"strconv"
12+
"sync"
1213
"sync/atomic"
1314

1415
"github.com/go-kit/kit/log"
@@ -46,10 +47,12 @@ type Handler struct {
4647
logger log.Logger
4748
receiver *Writer
4849
router *route.Router
49-
hashring Hashring
5050
options *Options
5151
listener net.Listener
5252

53+
mtx sync.RWMutex
54+
hashring Hashring
55+
5356
// Metrics
5457
requestDuration *prometheus.HistogramVec
5558
requestsTotal *prometheus.CounterVec
@@ -58,7 +61,6 @@ type Handler struct {
5861

5962
// These fields are uint32 rather than boolean to be able to use atomic functions.
6063
storageReady uint32
61-
hashringReady uint32
6264
}
6365

6466
func NewHandler(logger log.Logger, o *Options) *Handler {
@@ -140,20 +142,19 @@ func (h *Handler) StorageReady() {
140142
// Hashring sets the hashring for the handler and marks the hashring as ready.
141143
// If the hashring is nil, then the hashring is marked as not ready.
142144
func (h *Handler) Hashring(hashring Hashring) {
143-
if hashring == nil {
144-
atomic.StoreUint32(&h.hashringReady, 0)
145-
h.hashring = nil
146-
return
147-
}
145+
h.mtx.Lock()
146+
defer h.mtx.Unlock()
147+
148148
h.hashring = hashring
149-
atomic.StoreUint32(&h.hashringReady, 1)
150149
}
151150

152151
// Verifies whether the server is ready or not.
153152
func (h *Handler) isReady() bool {
154153
sr := atomic.LoadUint32(&h.storageReady)
155-
hr := atomic.LoadUint32(&h.hashringReady)
156-
return sr > 0 && hr > 0
154+
h.mtx.RLock()
155+
hr := h.hashring != nil
156+
h.mtx.RUnlock()
157+
return sr > 0 && hr
157158
}
158159

159160
// Checks if server is ready, calls f if it is, returns 503 if it is not.
@@ -275,6 +276,15 @@ func (h *Handler) receive(w http.ResponseWriter, r *http.Request) {
275276
func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *prompb.WriteRequest) error {
276277
wreqs := make(map[string]*prompb.WriteRequest)
277278
replicas := make(map[string]replica)
279+
280+
// It is possible that hashring is ready in testReady() but unready now,
281+
// so need to lock here.
282+
h.mtx.RLock()
283+
if h.hashring == nil {
284+
h.mtx.RUnlock()
285+
return errors.New("hashring is not ready")
286+
}
287+
278288
// Batch all of the time series in the write request
279289
// into several smaller write requests that are
280290
// grouped by target endpoint. This ensures that
@@ -285,6 +295,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
285295
for i := range wreq.Timeseries {
286296
endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n)
287297
if err != nil {
298+
h.mtx.RUnlock()
288299
return err
289300
}
290301
if _, ok := wreqs[endpoint]; !ok {
@@ -294,6 +305,7 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
294305
wr := wreqs[endpoint]
295306
wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i])
296307
}
308+
h.mtx.RUnlock()
297309

298310
return h.parallelizeRequests(ctx, tenant, replicas, wreqs)
299311
}
@@ -400,14 +412,25 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
400412
wreqs := make(map[string]*prompb.WriteRequest)
401413
replicas := make(map[string]replica)
402414
var i uint64
415+
416+
// It is possible that hashring is ready in testReady() but unready now,
417+
// so need to lock here.
418+
h.mtx.RLock()
419+
if h.hashring == nil {
420+
h.mtx.RUnlock()
421+
return errors.New("hashring is not ready")
422+
}
423+
403424
for i = 0; i < h.options.ReplicationFactor; i++ {
404425
endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[0], i)
405426
if err != nil {
427+
h.mtx.RUnlock()
406428
return err
407429
}
408430
wreqs[endpoint] = wreq
409431
replicas[endpoint] = replica{i, true}
410432
}
433+
h.mtx.RUnlock()
411434

412435
err := h.parallelizeRequests(ctx, tenant, replicas, wreqs)
413436
if errs, ok := err.(terrors.MultiError); ok {

0 commit comments

Comments
 (0)