Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,21 @@ type BinaryTOC struct {
}

// WriteBinary build index-header file from the pieces of index in object storage.
func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, fn string) (err error) {
func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, filename string) (err error) {
ir, indexVersion, err := newChunkedIndexReader(ctx, bkt, id)
if err != nil {
return errors.Wrap(err, "new index reader")
}
tmpFilename := filename + ".tmp"

// Buffer for copying and encbuffers.
// This also will control the size of file writer buffer.
buf := make([]byte, 32*1024)
bw, err := newBinaryWriter(fn, buf)
bw, err := newBinaryWriter(tmpFilename, buf)
if err != nil {
return errors.Wrap(err, "new binary index header writer")
}
defer runutil.CloseWithErrCapture(&err, bw, "close binary writer for %s", fn)
defer runutil.CloseWithErrCapture(&err, bw, "close binary writer for %s", tmpFilename)

if err := bw.AddIndexMeta(indexVersion, ir.toc.PostingsTable); err != nil {
return errors.Wrap(err, "add index meta")
Expand All @@ -109,7 +110,17 @@ func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, f
if err := bw.WriteTOC(); err != nil {
return errors.Wrap(err, "write index header TOC")
}
return nil

if err := bw.f.Flush(); err != nil {
return errors.Wrap(err, "flush")
}

if err := bw.f.f.Sync(); err != nil {
return errors.Wrap(err, "sync")
}

// Create index-header in atomic way, to avoid partial writes (e.g during restart or crash of store GW).
return os.Rename(tmpFilename, filename)
}

type chunkedIndexReader struct {
Expand Down