Skip to content

Commit 079c29c

Browse files
authored
Merge pull request syndtr#367 from kcalvinalvin/syncpool-bufferpool
util/buffer_pool, db: Reduce memory allocation
2 parents 64b5b1c + 71b98dd commit 079c29c

File tree

7 files changed

+93
-180
lines changed

7 files changed

+93
-180
lines changed

leveldb/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func recoverTable(s *session, o *opt.Options) error {
319319
}()
320320

321321
// Copy entries.
322-
tw := table.NewWriter(writer, o)
322+
tw := table.NewWriter(writer, o, nil, 0)
323323
for iter.Next() {
324324
key := iter.Key()
325325
if validInternalKey(key) {

leveldb/db_compaction.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
389389

390390
// Create new table.
391391
var err error
392-
b.tw, err = b.s.tops.create()
392+
b.tw, err = b.s.tops.create(b.tableSize)
393393
if err != nil {
394394
return err
395395
}

leveldb/db_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2539,7 +2539,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
25392539
value = bytes.Repeat([]byte{'0'}, 100)
25402540
)
25412541
for i := 0; i < 2; i++ {
2542-
tw, err := s.tops.create()
2542+
tw, err := s.tops.create(0)
25432543
if err != nil {
25442544
t.Fatal(err)
25452545
}

leveldb/table.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ type tOps struct {
366366
}
367367

368368
// Creates an empty table and returns table writer.
369-
func (t *tOps) create() (*tWriter, error) {
369+
func (t *tOps) create(tSize int) (*tWriter, error) {
370370
fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
371371
fw, err := t.s.stor.Create(fd)
372372
if err != nil {
@@ -376,13 +376,13 @@ func (t *tOps) create() (*tWriter, error) {
376376
t: t,
377377
fd: fd,
378378
w: fw,
379-
tw: table.NewWriter(fw, t.s.o.Options),
379+
tw: table.NewWriter(fw, t.s.o.Options, t.bpool, tSize),
380380
}, nil
381381
}
382382

383383
// Builds table from src iterator.
384384
func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
385-
w, err := t.create()
385+
w, err := t.create(0)
386386
if err != nil {
387387
return
388388
}
@@ -501,7 +501,6 @@ func (t *tOps) remove(fd storage.FileDesc) {
501501
// Closes the table ops instance. It will close all tables,
502502
// regadless still used or not.
503503
func (t *tOps) close() {
504-
t.bpool.Close()
505504
t.cache.Close()
506505
if t.bcache != nil {
507506
t.bcache.CloseWeak()

leveldb/table/table_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ var _ = testutil.Defer(func() {
4747
)
4848

4949
// Building the table.
50-
tw := NewWriter(buf, o)
50+
tw := NewWriter(buf, o, nil, 0)
5151
tw.Append([]byte("k01"), []byte("hello"))
5252
tw.Append([]byte("k02"), []byte("hello2"))
5353
tw.Append([]byte("k03"), bytes.Repeat([]byte{'x'}, 10000))
@@ -90,7 +90,7 @@ var _ = testutil.Defer(func() {
9090
buf := &bytes.Buffer{}
9191

9292
// Building the table.
93-
tw := NewWriter(buf, o)
93+
tw := NewWriter(buf, o, nil, 0)
9494
kv.Iterate(func(i int, key, value []byte) {
9595
tw.Append(key, value)
9696
})

leveldb/table/writer.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ type Writer struct {
146146
compression opt.Compression
147147
blockSize int
148148

149+
bpool *util.BufferPool
149150
dataBlock blockWriter
150151
indexBlock blockWriter
151152
filterBlock filterWriter
@@ -285,6 +286,16 @@ func (w *Writer) BytesLen() int {
285286
// after Close, but calling BlocksLen, EntriesLen and BytesLen
286287
// is still possible.
287288
func (w *Writer) Close() error {
289+
defer func() {
290+
if w.bpool != nil {
291+
// Buffer.Bytes() returns [offset:] of the buffer.
292+
// We need to Reset() so that the offset = 0, resulting
293+
// in buf.Bytes() returning the whole allocated bytes.
294+
w.dataBlock.buf.Reset()
295+
w.bpool.Put(w.dataBlock.buf.Bytes())
296+
}
297+
}()
298+
288299
if w.err != nil {
289300
return w.err
290301
}
@@ -351,14 +362,24 @@ func (w *Writer) Close() error {
351362
// NewWriter creates a new initialized table writer for the file.
352363
//
353364
// Table writer is not safe for concurrent use.
354-
func NewWriter(f io.Writer, o *opt.Options) *Writer {
365+
func NewWriter(f io.Writer, o *opt.Options, pool *util.BufferPool, size int) *Writer {
366+
var bufBytes []byte
367+
if pool == nil {
368+
bufBytes = make([]byte, size)
369+
} else {
370+
bufBytes = pool.Get(size)
371+
}
372+
bufBytes = bufBytes[:0]
373+
355374
w := &Writer{
356375
writer: f,
357376
cmp: o.GetComparer(),
358377
filter: o.GetFilter(),
359378
compression: o.GetCompression(),
360379
blockSize: o.GetBlockSize(),
361380
comparerScratch: make([]byte, 0),
381+
bpool: pool,
382+
dataBlock: blockWriter{buf: *util.NewBuffer(bufBytes)},
362383
}
363384
// data block
364385
w.dataBlock.restartInterval = o.GetBlockRestartInterval()

0 commit comments

Comments
 (0)