Skip to content

Commit 4baca25

Browse files
committed
Prevent time.Timer memory leak by using a singleton timer in bufferedPipes. Fix cbeuw#137
1 parent 39c06a6 commit 4baca25

File tree

2 files changed

+36
-6
lines changed

2 files changed

+36
-6
lines changed

internal/multiplex/datagramBufferedPipe.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ type datagramBufferedPipe struct {
2020
rwCond *sync.Cond
2121
wtTimeout time.Duration
2222
rDeadline time.Time
23+
24+
timer *time.Timer
2325
}
2426

2527
func NewDatagramBufferedPipe() *datagramBufferedPipe {
2628
d := &datagramBufferedPipe{
2729
rwCond: sync.NewCond(&sync.Mutex{}),
30+
timer: time.NewTimer(0),
2831
}
2932
return d
3033
}
@@ -45,7 +48,7 @@ func (d *datagramBufferedPipe) Read(target []byte) (int, error) {
4548
if delta <= 0 {
4649
return 0, ErrTimeout
4750
}
48-
time.AfterFunc(delta, d.rwCond.Broadcast)
51+
d.broadcastAfter(delta)
4952
}
5053

5154
if len(d.pLens) > 0 {
@@ -81,12 +84,12 @@ func (d *datagramBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
8184
}
8285
if d.wtTimeout == 0 {
8386
// if there hasn't been a scheduled broadcast
84-
time.AfterFunc(delta, d.rwCond.Broadcast)
87+
d.broadcastAfter(delta)
8588
}
8689
}
8790
if d.wtTimeout != 0 {
8891
d.rDeadline = time.Now().Add(d.wtTimeout)
89-
time.AfterFunc(d.wtTimeout, d.rwCond.Broadcast)
92+
d.broadcastAfter(d.wtTimeout)
9093
}
9194

9295
if len(d.pLens) > 0 {
@@ -160,3 +163,15 @@ func (d *datagramBufferedPipe) SetWriteToTimeout(t time.Duration) {
160163
d.wtTimeout = t
161164
d.rwCond.Broadcast()
162165
}
166+
167+
func (d *datagramBufferedPipe) broadcastAfter(delta time.Duration) {
168+
// d.rwCond.L must be held, otherwise the following timer operations will race
169+
if !d.timer.Stop() {
170+
<-d.timer.C
171+
}
172+
d.timer.Reset(delta)
173+
go func() {
174+
<-d.timer.C
175+
d.rwCond.Broadcast()
176+
}()
177+
}

internal/multiplex/streamBufferedPipe.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@ type streamBufferedPipe struct {
1818
rwCond *sync.Cond
1919
rDeadline time.Time
2020
wtTimeout time.Duration
21+
22+
timer *time.Timer
2123
}
2224

2325
func NewStreamBufferedPipe() *streamBufferedPipe {
2426
p := &streamBufferedPipe{
2527
rwCond: sync.NewCond(&sync.Mutex{}),
28+
timer: time.NewTimer(0),
2629
}
2730
return p
2831
}
@@ -42,7 +45,7 @@ func (p *streamBufferedPipe) Read(target []byte) (int, error) {
4245
if d <= 0 {
4346
return 0, ErrTimeout
4447
}
45-
time.AfterFunc(d, p.rwCond.Broadcast)
48+
p.broadcastAfter(d)
4649
}
4750
if p.buf.Len() > 0 {
4851
break
@@ -72,12 +75,12 @@ func (p *streamBufferedPipe) WriteTo(w io.Writer) (n int64, err error) {
7275
}
7376
if p.wtTimeout == 0 {
7477
// if there hasn't been a scheduled broadcast
75-
time.AfterFunc(d, p.rwCond.Broadcast)
78+
p.broadcastAfter(d)
7679
}
7780
}
7881
if p.wtTimeout != 0 {
7982
p.rDeadline = time.Now().Add(p.wtTimeout)
80-
time.AfterFunc(p.wtTimeout, p.rwCond.Broadcast)
83+
p.broadcastAfter(p.wtTimeout)
8184
}
8285
if p.buf.Len() > 0 {
8386
written, er := p.buf.WriteTo(w)
@@ -139,3 +142,15 @@ func (p *streamBufferedPipe) SetWriteToTimeout(d time.Duration) {
139142
p.wtTimeout = d
140143
p.rwCond.Broadcast()
141144
}
145+
146+
func (p *streamBufferedPipe) broadcastAfter(d time.Duration) {
147+
// p.rwCond.L must be held, otherwise the following timer operations will race
148+
if !p.timer.Stop() {
149+
<-p.timer.C
150+
}
151+
p.timer.Reset(d)
152+
go func() {
153+
<-p.timer.C
154+
p.rwCond.Broadcast()
155+
}()
156+
}

0 commit comments

Comments
 (0)