Skip to content

Commit ff67696

Browse files
*: fix memory leak introduced by timer.After (tikv#6720)
close tikv#6719 Signed-off-by: lhy1024 <admin@liudos.us> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent 05f71e0 commit ff67696

File tree

21 files changed

+253
-53
lines changed

21 files changed

+253
-53
lines changed

client/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,14 +526,16 @@ func newClientWithKeyspaceName(
526526

527527
func (c *client) initRetry(f func(s string) error, str string) error {
528528
var err error
529+
ticker := time.NewTicker(time.Second)
530+
defer ticker.Stop()
529531
for i := 0; i < c.option.maxRetryTimes; i++ {
530532
if err = f(str); err == nil {
531533
return nil
532534
}
533535
select {
534536
case <-c.ctx.Done():
535537
return err
536-
case <-time.After(time.Second):
538+
case <-ticker.C:
537539
}
538540
}
539541
return errors.WithStack(err)

client/pd_service_discovery.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,14 +206,16 @@ func (c *pdServiceDiscovery) Init() error {
206206

207207
func (c *pdServiceDiscovery) initRetry(f func() error) error {
208208
var err error
209+
ticker := time.NewTicker(time.Second)
210+
defer ticker.Stop()
209211
for i := 0; i < c.option.maxRetryTimes; i++ {
210212
if err = f(); err == nil {
211213
return nil
212214
}
213215
select {
214216
case <-c.ctx.Done():
215217
return err
216-
case <-time.After(time.Second):
218+
case <-ticker.C:
217219
}
218220
}
219221
return errors.WithStack(err)

client/resource_manager_client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,8 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso
389389
err error
390390
stream rmpb.ResourceManager_AcquireTokenBucketsClient
391391
)
392+
ticker := time.NewTicker(retryInterval)
393+
defer ticker.Stop()
392394
for i := 0; i < maxRetryTimes; i++ {
393395
cc, err := c.resourceManagerClient()
394396
if err != nil {
@@ -406,7 +408,7 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso
406408
select {
407409
case <-ctx.Done():
408410
return err
409-
case <-time.After(retryInterval):
411+
case <-ticker.C:
410412
}
411413
}
412414
return err

client/timerpool/pool.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright 2020 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133
6+
7+
package timerpool
8+
9+
import (
10+
"sync"
11+
"time"
12+
)
13+
14+
// GlobalTimerPool is a global pool for reusing *time.Timer.
15+
var GlobalTimerPool TimerPool
16+
17+
// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse.
18+
type TimerPool struct {
19+
pool sync.Pool
20+
}
21+
22+
// Get returns a timer with a given duration.
23+
func (tp *TimerPool) Get(d time.Duration) *time.Timer {
24+
if v := tp.pool.Get(); v != nil {
25+
timer := v.(*time.Timer)
26+
timer.Reset(d)
27+
return timer
28+
}
29+
return time.NewTimer(d)
30+
}
31+
32+
// Put tries to call timer.Stop() before putting it back into pool,
33+
// if the timer.Stop() returns false (it has either already expired or been stopped),
34+
// have a shot at draining the channel with residual time if there is one.
35+
func (tp *TimerPool) Put(timer *time.Timer) {
36+
if !timer.Stop() {
37+
select {
38+
case <-timer.C:
39+
default:
40+
}
41+
}
42+
tp.pool.Put(timer)
43+
}

client/timerpool/pool_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2020 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133
6+
7+
package timerpool
8+
9+
import (
10+
"testing"
11+
"time"
12+
)
13+
14+
func TestTimerPool(t *testing.T) {
15+
var tp TimerPool
16+
17+
for i := 0; i < 100; i++ {
18+
timer := tp.Get(20 * time.Millisecond)
19+
20+
select {
21+
case <-timer.C:
22+
t.Errorf("timer expired too early")
23+
continue
24+
default:
25+
}
26+
27+
select {
28+
case <-time.After(100 * time.Millisecond):
29+
t.Errorf("timer didn't expire on time")
30+
case <-timer.C:
31+
}
32+
33+
tp.Put(timer)
34+
}
35+
}
36+
37+
const timeout = 10 * time.Millisecond
38+
39+
func BenchmarkTimerUtilization(b *testing.B) {
40+
b.Run("TimerWithPool", func(b *testing.B) {
41+
for i := 0; i < b.N; i++ {
42+
t := GlobalTimerPool.Get(timeout)
43+
GlobalTimerPool.Put(t)
44+
}
45+
})
46+
b.Run("TimerWithoutPool", func(b *testing.B) {
47+
for i := 0; i < b.N; i++ {
48+
t := time.NewTimer(timeout)
49+
t.Stop()
50+
}
51+
})
52+
}
53+
54+
func BenchmarkTimerPoolParallel(b *testing.B) {
55+
b.RunParallel(func(pb *testing.PB) {
56+
for pb.Next() {
57+
t := GlobalTimerPool.Get(timeout)
58+
GlobalTimerPool.Put(t)
59+
}
60+
})
61+
}
62+
63+
func BenchmarkTimerNativeParallel(b *testing.B) {
64+
b.RunParallel(func(pb *testing.PB) {
65+
for pb.Next() {
66+
t := time.NewTimer(timeout)
67+
t.Stop()
68+
}
69+
})
70+
}

client/tso_dispatcher.go

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/pingcap/log"
2828
"github.com/tikv/pd/client/errs"
2929
"github.com/tikv/pd/client/grpcutil"
30+
"github.com/tikv/pd/client/timerpool"
3031
"github.com/tikv/pd/client/tsoutil"
3132
"go.uber.org/zap"
3233
"google.golang.org/grpc"
@@ -139,11 +140,24 @@ func (c *tsoClient) updateTSODispatcher() {
139140
}
140141

141142
type deadline struct {
142-
timer <-chan time.Time
143+
timer *time.Timer
143144
done chan struct{}
144145
cancel context.CancelFunc
145146
}
146147

148+
func newTSDeadline(
149+
timeout time.Duration,
150+
done chan struct{},
151+
cancel context.CancelFunc,
152+
) *deadline {
153+
timer := timerpool.GlobalTimerPool.Get(timeout)
154+
return &deadline{
155+
timer: timer,
156+
done: done,
157+
cancel: cancel,
158+
}
159+
}
160+
147161
func (c *tsoClient) tsCancelLoop() {
148162
defer c.wg.Done()
149163

@@ -172,19 +186,21 @@ func (c *tsoClient) tsCancelLoop() {
172186

173187
func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) {
174188
if _, exist := c.tsDeadline.Load(dcLocation); !exist {
175-
tsDeadlineCh := make(chan deadline, 1)
189+
tsDeadlineCh := make(chan *deadline, 1)
176190
c.tsDeadline.Store(dcLocation, tsDeadlineCh)
177-
go func(dc string, tsDeadlineCh <-chan deadline) {
191+
go func(dc string, tsDeadlineCh <-chan *deadline) {
178192
for {
179193
select {
180194
case d := <-tsDeadlineCh:
181195
select {
182-
case <-d.timer:
196+
case <-d.timer.C:
183197
log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout))
184198
d.cancel()
199+
timerpool.GlobalTimerPool.Put(d.timer)
185200
case <-d.done:
186-
continue
201+
timerpool.GlobalTimerPool.Put(d.timer)
187202
case <-ctx.Done():
203+
timerpool.GlobalTimerPool.Put(d.timer)
188204
return
189205
}
190206
case <-ctx.Done():
@@ -234,6 +250,8 @@ func (c *tsoClient) checkAllocator(
234250
}()
235251
cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc)
236252
healthCli := healthpb.NewHealthClient(cc)
253+
ticker := time.NewTicker(time.Second)
254+
defer ticker.Stop()
237255
for {
238256
// the pd/allocator leader change, we need to re-establish the stream
239257
if u != url {
@@ -259,7 +277,7 @@ func (c *tsoClient) checkAllocator(
259277
select {
260278
case <-dispatcherCtx.Done():
261279
return
262-
case <-time.After(time.Second):
280+
case <-ticker.C:
263281
// To ensure we can get the latest allocator leader
264282
// and once the leader is changed, we can exit this function.
265283
_, u = c.GetTSOAllocatorClientConnByDCLocation(dc)
@@ -366,6 +384,7 @@ func (c *tsoClient) handleDispatcher(
366384

367385
// Loop through each batch of TSO requests and send them for processing.
368386
streamLoopTimer := time.NewTimer(c.option.timeout)
387+
defer streamLoopTimer.Stop()
369388
tsoBatchLoop:
370389
for {
371390
select {
@@ -389,6 +408,15 @@ tsoBatchLoop:
389408
if maxBatchWaitInterval >= 0 {
390409
tbc.adjustBestBatchSize()
391410
}
411+
// Stop the timer if it's not stopped.
412+
if !streamLoopTimer.Stop() {
413+
select {
414+
case <-streamLoopTimer.C: // try to drain from the channel
415+
default:
416+
}
417+
}
418+
// We need be careful here, see more details in the comments of Timer.Reset.
419+
// https://pkg.go.dev/time@master#Timer.Reset
392420
streamLoopTimer.Reset(c.option.timeout)
393421
// Choose a stream to send the TSO gRPC request.
394422
streamChoosingLoop:
@@ -403,16 +431,20 @@ tsoBatchLoop:
403431
if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) {
404432
continue streamChoosingLoop
405433
}
434+
timer := time.NewTimer(retryInterval)
406435
select {
407436
case <-dispatcherCtx.Done():
437+
timer.Stop()
408438
return
409439
case <-streamLoopTimer.C:
410440
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
411441
log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
412442
c.svcDiscovery.ScheduleCheckMemberChanged()
413443
c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
444+
timer.Stop()
414445
continue tsoBatchLoop
415-
case <-time.After(retryInterval):
446+
case <-timer.C:
447+
timer.Stop()
416448
continue streamChoosingLoop
417449
}
418450
}
@@ -429,11 +461,7 @@ tsoBatchLoop:
429461
}
430462
}
431463
done := make(chan struct{})
432-
dl := deadline{
433-
timer: time.After(c.option.timeout),
434-
done: done,
435-
cancel: cancel,
436-
}
464+
dl := newTSDeadline(c.option.timeout, done, cancel)
437465
tsDeadlineCh, ok := c.tsDeadline.Load(dc)
438466
for !ok || tsDeadlineCh == nil {
439467
c.scheduleCheckTSDeadline()
@@ -443,7 +471,7 @@ tsoBatchLoop:
443471
select {
444472
case <-dispatcherCtx.Done():
445473
return
446-
case tsDeadlineCh.(chan deadline) <- dl:
474+
case tsDeadlineCh.(chan *deadline) <- dl:
447475
}
448476
opts = extractSpanReference(tbc, opts[:0])
449477
err = c.processRequests(stream, dc, tbc, opts)
@@ -558,6 +586,8 @@ func (c *tsoClient) tryConnectToTSO(
558586
}
559587
// retry several times before falling back to the follower when the network problem happens
560588

589+
ticker := time.NewTicker(retryInterval)
590+
defer ticker.Stop()
561591
for i := 0; i < maxRetryTimes; i++ {
562592
c.svcDiscovery.ScheduleCheckMemberChanged()
563593
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
@@ -587,7 +617,7 @@ func (c *tsoClient) tryConnectToTSO(
587617
select {
588618
case <-dispatcherCtx.Done():
589619
return err
590-
case <-time.After(retryInterval):
620+
case <-ticker.C:
591621
}
592622
}
593623

client/tso_service_discovery.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,14 +209,16 @@ func (c *tsoServiceDiscovery) retry(
209209
maxRetryTimes int, retryInterval time.Duration, f func() error,
210210
) error {
211211
var err error
212+
ticker := time.NewTicker(retryInterval)
213+
defer ticker.Stop()
212214
for i := 0; i < maxRetryTimes; i++ {
213215
if err = f(); err == nil {
214216
return nil
215217
}
216218
select {
217219
case <-c.ctx.Done():
218220
return err
219-
case <-time.After(retryInterval):
221+
case <-ticker.C:
220222
}
221223
}
222224
return errors.WithStack(err)
@@ -245,11 +247,13 @@ func (c *tsoServiceDiscovery) startCheckMemberLoop() {
245247

246248
ctx, cancel := context.WithCancel(c.ctx)
247249
defer cancel()
250+
ticker := time.NewTicker(memberUpdateInterval)
251+
defer ticker.Stop()
248252

249253
for {
250254
select {
251255
case <-c.checkMembershipCh:
252-
case <-time.After(memberUpdateInterval):
256+
case <-ticker.C:
253257
case <-ctx.Done():
254258
log.Info("[tso] exit check member loop")
255259
return

client/tso_stream.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,12 @@ func (b *tsoTSOStreamBuilder) build(
8787
}
8888

8989
func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done chan struct{}, timeout time.Duration) {
90+
timer := time.NewTimer(timeout)
91+
defer timer.Stop()
9092
select {
9193
case <-done:
9294
return
93-
case <-time.After(timeout):
95+
case <-timer.C:
9496
cancel()
9597
case <-ctx.Done():
9698
}

0 commit comments

Comments
 (0)