Skip to content

Commit 2449497

Browse files
committed
util/buffer_pool: Use sync.Pool
The sync.Pool implementation in go std library has improved drastically since the buffer pool used here has been first implemented. Using sync.Pool results in drastic memory reduction as well as a simpler buffer_pool code.
1 parent 64b5b1c commit 2449497

File tree

2 files changed

+63
-171
lines changed

2 files changed

+63
-171
lines changed

leveldb/table.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,6 @@ func (t *tOps) remove(fd storage.FileDesc) {
501501
// Closes the table ops instance. It will close all tables,
502502
// regadless still used or not.
503503
func (t *tOps) close() {
504-
t.bpool.Close()
505504
t.cache.Close()
506505
if t.bcache != nil {
507506
t.bcache.CloseWeak()

leveldb/util/buffer_pool.go

Lines changed: 63 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -10,148 +10,76 @@ import (
1010
"fmt"
1111
"sync"
1212
"sync/atomic"
13-
"time"
1413
)
1514

16-
type buffer struct {
17-
b []byte
18-
miss int
19-
}
20-
2115
// BufferPool is a 'buffer pool'.
2216
type BufferPool struct {
23-
pool [6]chan []byte
24-
size [5]uint32
25-
sizeMiss [5]uint32
26-
sizeHalf [5]uint32
27-
baseline [4]int
28-
baseline0 int
29-
30-
mu sync.RWMutex
31-
closed bool
32-
closeC chan struct{}
17+
pool [6]sync.Pool
18+
baseline [5]int
3319

3420
get uint32
3521
put uint32
36-
half uint32
3722
less uint32
3823
equal uint32
3924
greater uint32
4025
miss uint32
4126
}
4227

4328
func (p *BufferPool) poolNum(n int) int {
44-
if n <= p.baseline0 && n > p.baseline0/2 {
45-
return 0
46-
}
4729
for i, x := range p.baseline {
4830
if n <= x {
49-
return i + 1
31+
return i
5032
}
5133
}
52-
return len(p.baseline) + 1
34+
return len(p.baseline)
5335
}
5436

5537
// Get returns buffer with length of n.
5638
func (p *BufferPool) Get(n int) []byte {
5739
if p == nil {
5840
return make([]byte, n)
5941
}
42+
atomic.AddUint32(&p.get, 1)
6043

61-
p.mu.RLock()
62-
defer p.mu.RUnlock()
44+
poolNum := p.poolNum(n)
6345

64-
if p.closed {
65-
return make([]byte, n)
66-
}
46+
b := p.pool[poolNum].Get().(*[]byte)
6747

68-
atomic.AddUint32(&p.get, 1)
48+
if cap(*b) == 0 {
49+
// If we grabbed nothing, increment the miss stats.
50+
atomic.AddUint32(&p.miss, 1)
6951

70-
poolNum := p.poolNum(n)
71-
pool := p.pool[poolNum]
72-
if poolNum == 0 {
73-
// Fast path.
74-
select {
75-
case b := <-pool:
76-
switch {
77-
case cap(b) > n:
78-
if cap(b)-n >= n {
79-
atomic.AddUint32(&p.half, 1)
80-
select {
81-
case pool <- b:
82-
default:
83-
}
84-
return make([]byte, n)
85-
} else {
86-
atomic.AddUint32(&p.less, 1)
87-
return b[:n]
88-
}
89-
case cap(b) == n:
90-
atomic.AddUint32(&p.equal, 1)
91-
return b[:n]
92-
default:
93-
atomic.AddUint32(&p.greater, 1)
94-
}
95-
default:
96-
atomic.AddUint32(&p.miss, 1)
52+
if poolNum == len(p.baseline) {
53+
*b = make([]byte, n)
54+
return *b
9755
}
9856

99-
return make([]byte, n, p.baseline0)
57+
*b = make([]byte, p.baseline[poolNum])
58+
*b = (*b)[:n]
59+
return *b
10060
} else {
101-
sizePtr := &p.size[poolNum-1]
102-
103-
select {
104-
case b := <-pool:
105-
switch {
106-
case cap(b) > n:
107-
if cap(b)-n >= n {
108-
atomic.AddUint32(&p.half, 1)
109-
sizeHalfPtr := &p.sizeHalf[poolNum-1]
110-
if atomic.AddUint32(sizeHalfPtr, 1) == 20 {
111-
atomic.StoreUint32(sizePtr, uint32(cap(b)/2))
112-
atomic.StoreUint32(sizeHalfPtr, 0)
113-
} else {
114-
select {
115-
case pool <- b:
116-
default:
117-
}
118-
}
119-
return make([]byte, n)
120-
} else {
121-
atomic.AddUint32(&p.less, 1)
122-
return b[:n]
123-
}
124-
case cap(b) == n:
125-
atomic.AddUint32(&p.equal, 1)
126-
return b[:n]
127-
default:
128-
atomic.AddUint32(&p.greater, 1)
129-
if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) {
130-
select {
131-
case pool <- b:
132-
default:
133-
}
134-
}
135-
}
136-
default:
137-
atomic.AddUint32(&p.miss, 1)
61+
// If there is enough capacity in the bytes grabbed, resize the length
62+
// to n and return.
63+
if n < cap(*b) {
64+
atomic.AddUint32(&p.less, 1)
65+
*b = (*b)[:n]
66+
return *b
67+
} else if n == cap(*b) {
68+
atomic.AddUint32(&p.equal, 1)
69+
*b = (*b)[:n]
70+
return *b
71+
} else if n > cap(*b) {
72+
atomic.AddUint32(&p.greater, 1)
13873
}
74+
}
13975

140-
if size := atomic.LoadUint32(sizePtr); uint32(n) > size {
141-
if size == 0 {
142-
atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n))
143-
} else {
144-
sizeMissPtr := &p.sizeMiss[poolNum-1]
145-
if atomic.AddUint32(sizeMissPtr, 1) == 20 {
146-
atomic.StoreUint32(sizePtr, uint32(n))
147-
atomic.StoreUint32(sizeMissPtr, 0)
148-
}
149-
}
150-
return make([]byte, n)
151-
} else {
152-
return make([]byte, n, size)
153-
}
76+
if poolNum == len(p.baseline) {
77+
*b = make([]byte, n)
78+
return *b
15479
}
80+
*b = make([]byte, p.baseline[poolNum])
81+
*b = (*b)[:n]
82+
return *b
15583
}
15684

15785
// Put adds given buffer to the pool.
@@ -160,83 +88,48 @@ func (p *BufferPool) Put(b []byte) {
16088
return
16189
}
16290

163-
p.mu.RLock()
164-
defer p.mu.RUnlock()
165-
166-
if p.closed {
167-
return
168-
}
91+
poolNum := p.poolNum(cap(b))
16992

17093
atomic.AddUint32(&p.put, 1)
171-
172-
pool := p.pool[p.poolNum(cap(b))]
173-
select {
174-
case pool <- b:
175-
default:
176-
}
177-
178-
}
179-
180-
func (p *BufferPool) Close() {
181-
if p == nil {
182-
return
183-
}
184-
185-
p.mu.Lock()
186-
if !p.closed {
187-
p.closed = true
188-
p.closeC <- struct{}{}
189-
}
190-
p.mu.Unlock()
94+
p.pool[poolNum].Put(&b)
19195
}
19296

19397
func (p *BufferPool) String() string {
19498
if p == nil {
19599
return "<nil>"
196100
}
197-
198-
p.mu.Lock()
199-
defer p.mu.Unlock()
200-
201-
return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}",
202-
p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss)
203-
}
204-
205-
func (p *BufferPool) drain() {
206-
ticker := time.NewTicker(2 * time.Second)
207-
defer ticker.Stop()
208-
for {
209-
select {
210-
case <-ticker.C:
211-
for _, ch := range p.pool {
212-
select {
213-
case <-ch:
214-
default:
215-
}
216-
}
217-
case <-p.closeC:
218-
close(p.closeC)
219-
for _, ch := range p.pool {
220-
close(ch)
221-
}
222-
return
223-
}
224-
}
101+
return fmt.Sprintf("BufferPool{B·%d G·%d P·%d <·%d =·%d >·%d M·%d}",
102+
p.baseline, p.get, p.put, p.less, p.equal, p.greater, p.miss)
225103
}
226104

227105
// NewBufferPool creates a new initialized 'buffer pool'.
228106
func NewBufferPool(baseline int) *BufferPool {
229107
if baseline <= 0 {
230108
panic("baseline can't be <= 0")
231109
}
232-
p := &BufferPool{
233-
baseline0: baseline,
234-
baseline: [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4},
235-
closeC: make(chan struct{}, 1),
110+
bufPool := &BufferPool{
111+
baseline: [...]int{baseline / 4, baseline / 2, baseline, baseline * 2, baseline * 4},
112+
pool: [6]sync.Pool{
113+
sync.Pool{
114+
New: func() interface{} { return new([]byte) },
115+
},
116+
sync.Pool{
117+
New: func() interface{} { return new([]byte) },
118+
},
119+
sync.Pool{
120+
New: func() interface{} { return new([]byte) },
121+
},
122+
sync.Pool{
123+
New: func() interface{} { return new([]byte) },
124+
},
125+
sync.Pool{
126+
New: func() interface{} { return new([]byte) },
127+
},
128+
sync.Pool{
129+
New: func() interface{} { return new([]byte) },
130+
},
131+
},
236132
}
237-
for i, cap := range []int{2, 2, 4, 4, 2, 1} {
238-
p.pool[i] = make(chan []byte, cap)
239-
}
240-
go p.drain()
241-
return p
133+
134+
return bufPool
242135
}

0 commit comments

Comments
 (0)