Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed comments.
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jan 20, 2020
commit 462d6072c57c3b3822275c29e9e16632e5773f00
84 changes: 49 additions & 35 deletions pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"sort"
"time"
"unsafe"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -39,6 +40,8 @@ const (
MagicIndex = 0xBAAAD792

symbolFactor = 32

postingLengthFieldSize = 4
)

// The table gets initialized with sync.Once but may still cause a race
Expand All @@ -60,7 +63,7 @@ func newCRC32() hash.Hash32 {
type BinaryTOC struct {
// Symbols holds start to the same symbols section as index related to this index header.
Symbols uint64
// PostingsTable holds start to the the same Postings Offset Table section as index related to this index header.
// PostingsOffsetTable holds start to the the same Postings Offset Table section as index related to this index header.
PostingsOffsetTable uint64
}

Expand Down Expand Up @@ -383,6 +386,11 @@ func (w *binaryWriter) Close() error {
return w.f.Close()
}

type postingValueOffsets struct {
offsets []postingOffset
lastValOffset int64
}

type postingOffset struct {
// label value.
value string
Expand All @@ -399,7 +407,7 @@ type BinaryReader struct {

// Map of LabelName to a list of some LabelValues's position in the offset table.
// The first and last values for each name are always present.
postings map[string][]postingOffset
postings map[string]*postingValueOffsets
// For the v1 format, labelname -> labelvalue -> offset.
postingsV1 map[string]map[string]index.Range

Expand All @@ -422,13 +430,14 @@ func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bucket
return br, nil
}

level.Warn(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err)
level.Debug(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err)

start := time.Now()
if err := WriteBinary(ctx, bkt, id, binfn); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice tracking an histogram metric with the time it takes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm.. let's start with elapsed time in log line WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

return nil, errors.Wrap(err, "write index header")
}

level.Debug(logger).Log("msg", "build index-header file", "path", binfn, "err", err)
level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we don't have any metric whenever an index-header is created, if there's a bug which re-creates the index-header for a block each time (because the reader detects it as corrupted) it would be very difficult to notice, unless you run Thanos with debug logging. Switching this log to Info may help.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. In this case we need a metric (:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add that in next PR. (:


return newFileBinaryReader(binfn)
}
Expand All @@ -447,7 +456,7 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
r := &BinaryReader{
b: realByteSlice(f.Bytes()),
c: f,
postings: map[string][]postingOffset{},
postings: map[string]*postingValueOffsets{},
}

// Verify header.
Expand Down Expand Up @@ -487,9 +496,9 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
if len(key) != 2 {
return errors.Errorf("unexpected key length for posting table %d", len(key))
}
// TODO(bwplotka): This is wrong, probably we have to sort.

if lastKey != nil {
prevRng.End = int64(off + 4)
prevRng.End = int64(off - crc32.Size)
r.postingsV1[lastKey[0]][lastKey[1]] = prevRng
}

Expand All @@ -499,13 +508,13 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
}

lastKey = key
prevRng = index.Range{Start: int64(off + 4)}
prevRng = index.Range{Start: int64(off + postingLengthFieldSize)}
return nil
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
if lastKey != nil {
prevRng.End = r.indexLastPostingEnd + 4
prevRng.End = r.indexLastPostingEnd - crc32.Size
r.postingsV1[lastKey[0]][lastKey[1]] = prevRng
}
} else {
Expand All @@ -521,34 +530,41 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {

if _, ok := r.postings[key[0]]; !ok {
// Next label name.
r.postings[key[0]] = []postingOffset{}
r.postings[key[0]] = &postingValueOffsets{}
if lastKey != nil {
// Always include last value for each label name.
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], tableOff: lastTableOff})
if valueCount%symbolFactor != 0 {
// Always include last value for each label name.
r.postings[lastKey[0]].offsets = append(r.postings[lastKey[0]].offsets, postingOffset{value: lastKey[1], tableOff: lastTableOff})
}
r.postings[lastKey[0]].lastValOffset = int64(off - crc32.Size)
lastKey = nil
}
valueCount = 0
}

lastKey = key
if valueCount%symbolFactor == 0 {
r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], tableOff: tableOff})
lastKey = nil
r.postings[key[0]].offsets = append(r.postings[key[0]].offsets, postingOffset{value: key[1], tableOff: tableOff})
return nil
}
lastKey = key

lastTableOff = tableOff
valueCount++
return nil
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
if lastKey != nil {
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], tableOff: lastTableOff})
if valueCount%symbolFactor != 0 {
r.postings[lastKey[0]].offsets = append(r.postings[lastKey[0]].offsets, postingOffset{value: lastKey[1], tableOff: lastTableOff})
}
r.postings[lastKey[0]].lastValOffset = r.indexLastPostingEnd - crc32.Size
}
// Trim any extra space in the slices.
for k, v := range r.postings {
l := make([]postingOffset, len(v))
copy(l, v)
r.postings[k] = l
l := make([]postingOffset, len(v.offsets))
copy(l, v.offsets)
r.postings[k].offsets = l
}
}

Expand Down Expand Up @@ -637,7 +653,7 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran

skip := 0
valueIndex := 0
for valueIndex < len(values) && values[valueIndex] < e[0].value {
for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value {
// Discard values before the start.
valueIndex++
}
Expand All @@ -646,19 +662,19 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran
for valueIndex < len(values) {
value := values[valueIndex]

i := sort.Search(len(e), func(i int) bool { return e[i].value >= value })
if i == len(e) {
i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= value })
if i == len(e.offsets) {
// We're past the end.
break
}
if i > 0 && e[i].value != value {
if i > 0 && e.offsets[i].value != value {
// Need to look from previous entry.
i--
}
// Don't Crc32 the entire postings offset table, this is very slow
// so hope any issues were caught at startup.
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil)
d.Skip(e[i].tableOff)
d.Skip(e.offsets[i].tableOff)

tmpRngs = tmpRngs[:0]
// Iterate on the offset table.
Expand All @@ -677,31 +693,29 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran
postingOffset := int64(d.Uvarint64()) // Offset.
for string(v) >= value {
if string(v) == value {
// Actual posting is 4 bytes after offset, which includes length.
tmpRngs = append(tmpRngs, index.Range{Start: postingOffset + 4})
tmpRngs = append(tmpRngs, index.Range{Start: postingOffset + postingLengthFieldSize})
}
valueIndex++
if valueIndex == len(values) {
break
}
value = values[valueIndex]
}
if i+1 == len(e) {
if i+1 == len(e.offsets) {
for i := range tmpRngs {
tmpRngs[i].End = r.indexLastPostingEnd
tmpRngs[i].End = e.lastValOffset
}
rngs = append(rngs, tmpRngs...)
// Need to go to a later postings offset entry, if there is one.
break
}

if value >= e[i+1].value || valueIndex == len(values) {
if value >= e.offsets[i+1].value || valueIndex == len(values) {
d.Skip(skip)
d.UvarintBytes() // Label value.
postingOffset := int64(d.Uvarint64()) // Offset.
for j := range tmpRngs {
// Actual posting end is 4 bytes before next offset.
tmpRngs[j].End = postingOffset - 4
tmpRngs[j].End = postingOffset - crc32.Size
}
rngs = append(rngs, tmpRngs...)
// Need to go to a later postings offset entry, if there is one.
Expand Down Expand Up @@ -748,14 +762,14 @@ func (r BinaryReader) LabelValues(name string) ([]string, error) {
if !ok {
return nil, nil
}
if len(e) == 0 {
if len(e.offsets) == 0 {
return nil, nil
}
values := make([]string, 0, len(e)*symbolFactor)
values := make([]string, 0, len(e.offsets)*symbolFactor)

d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil)
d.Skip(e[0].tableOff)
lastVal := e[len(e)-1].value
d.Skip(e.offsets[0].tableOff)
lastVal := e.offsets[len(e.offsets)-1].value

skip := 0
for d.Err() == nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/block/indexheader/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Reader interface {
IndexVersion() int

// PostingsOffset returns start and end offsets of postings for given name and value.
// end offset might be bigger than actual posting ending, but not larger then the whole index file.
// The end offset might be bigger than the actual posting ending, but not larger than the whole index file.
// NotFoundRangeErr is returned when no index can be found for given name and value.
// TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark.
PostingsOffset(name string, value string) (index.Range, error)
Expand All @@ -27,7 +27,9 @@ type Reader interface {
// Error is return if the symbol can't be found.
LookupSymbol(o uint32) (string, error)

// LabelValues returns all label values for given label name or error if not found.
// LabelValues returns all label values for given label name or error.
// If no values are found for label name, or label name does not exists,
// then empty string is returned and no error.
LabelValues(name string) ([]string, error)

// LabelNames returns all label names.
Expand Down
33 changes: 18 additions & 15 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe

minStart := int64(math.MaxInt64)
maxEnd := int64(math.MinInt64)
for _, lname := range expLabelNames {
for il, lname := range expLabelNames {
expectedLabelVals, err := indexReader.LabelValues(lname)
testutil.Ok(t, err)

vals, err := headerReader.LabelValues(lname)
testutil.Ok(t, err)
testutil.Equals(t, expectedLabelVals, vals)

for i, v := range vals {
for iv, v := range vals {
if minStart > expRanges[labels.Label{Name: lname, Value: v}].Start {
minStart = expRanges[labels.Label{Name: lname, Value: v}].Start
}
Expand All @@ -195,32 +195,35 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe

// For index-cache those values are exact.
//
// For binary they are exact except:
// * formatV2: last item posting offset. It's good enough if the value is larger than exact posting ending.
// * formatV1: all items.
if i == len(vals)-1 || indexReader.Version() == index.FormatV1 {
testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start)
testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End)
continue
// For binary they are exact except last item posting offset. It's good enough if the value is larger than exact posting ending.
if indexReader.Version() == index.FormatV2 {
if iv == len(vals)-1 && il == len(expLabelNames)-1 {
testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start)
testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End)
continue
}
} else {
// For index formatV1 the last one does not mean literally last value, as postings were not sorted.
// Account for that. We know it's 40 label value.
if v == "40" {
testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start)
testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End)
continue
}
}
testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}], ptr)
}
}

ptr, err := headerReader.PostingsOffset(index.AllPostingsKey())
testutil.Ok(t, err)
// For AllPostingsKey ending has also too large ending which is well handled further on.
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].Start, ptr.Start)
testutil.Assert(t, expRanges[labels.Label{Name: "", Value: ""}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: "", Value: ""}].End)
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].End, ptr.End)

vals, err := indexReader.LabelValues("not-existing")
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

vals, err = headerReader.LabelValues("not-existing")
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

_, err = headerReader.PostingsOffset("not-existing", "1")
testutil.NotOk(t, err)
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,8 +1565,13 @@ func resizePostings(b []byte) ([]byte, error) {
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "read postings list")
}
// 4 for posting length, then n * 4, foreach each big endian posting.
return b[:4+n*4], nil

// 4 for postings number of entries, then 4, foreach each big endian posting.
size := 4 + n*4
if len(b) <= size {
return nil, encoding.ErrInvalidSize
}
return b[:size], nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
Expand Down