Skip to content

Commit 34ad98e

Browse files
GiedriusSbwplotka
authored andcommitted
store/bucket: make getFor() work with interleaved resolutions (#1146)
* store/bucket_test: add interleaved resolutions test for getFor() * store/bucket: include blocks in the middle as well * store/bucket: add test cases with duplicated time ranges * query/querier: send proper maxSourceResolution Without this, we get max resolutions like 1, 2, 3 which do not mean anything to getFor(). With this, we get proper data from Thanos Store. * README: add entry * query/querier_test: add queryableCreator test Makes a querier via queryableCreator and checks if the maxSourceResolution was passed properly. * store/bucket: do the iteration without sorting * store/bucket: bsi->j in loop Makes it clearer that it's just a temporary variable for the loop. * store/bucket: fix according to review comments * Convert parseDownsamplingParam() into parseDownsamplingParamMillis() which properly returns int64 for use in the querier code * Add parseDownsamplingMillis() tests * query/querier_test: fix test * *: clarify everywhere that max source resolution is in millis * *: maxSourceResolutionMillis -> maxResolutionMillis * CHANGELOG: update * query/querier_test: fix * store/bucket: add gets all data in range property test * store/bucket_test: add production property test * store/bucket_test: fix * store/bucket_test: add always gets property * query/querier_test: do not shrink * store/bucket: revert change This doesn't really matter as the tests show. * store/bucket_test: remove more confusion * store/bucket: clean up tests Only leave the property tests in place since they catch all of the errors. * Simplified goFor implementation. Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
1 parent 43102df commit 34ad98e

File tree

9 files changed

+275
-44
lines changed

9 files changed

+275
-44
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1111

1212
## Unreleased
1313

14+
### Fixed
15+
16+
- [#1146](https://github.com/improbable-eng/thanos/pull/1146) store/bucket: make getFor() work with interleaved resolutions
17+
1418
### Added
1519

1620
- [#1094](https://github.com/improbable-eng/thanos/pull/1094) Allow configuring the response header timeout for the S3 client.

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
github.com/hashicorp/golang-lru v0.5.1
2222
github.com/hashicorp/memberlist v0.1.3
2323
github.com/julienschmidt/httprouter v1.1.0 // indirect
24+
github.com/leanovate/gopter v0.2.4
2425
github.com/lovoo/gcloud-opentracing v0.3.0
2526
github.com/miekg/dns v1.1.8
2627
github.com/minio/minio-go v0.0.0-20200511070425-f33eae714a28

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
155155
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
156156
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
157157
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
158+
github.com/leanovate/gopter v0.2.4 h1:U4YLBggDFhJdqQsG4Na2zX7joVTky9vHaj/AGEwSuXU=
159+
github.com/leanovate/gopter v0.2.4/go.mod h1:gNcbPWNEWRe4lm+bycKqxUYoH5uoVje5SkOJ3uoLer8=
158160
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
159161
github.com/lightstep/lightstep-tracer-go v0.15.6/go.mod h1:6AMpwZpsyCFwSovxzM78e+AsYxE8sGwiM6C3TytaWeI=
160162
github.com/lovoo/gcloud-opentracing v0.3.0 h1:nAeKG70rIsog0TelcEtt6KU0Y1s5qXtsDLnHp0urPLU=

pkg/query/api/v1.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,9 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool
203203
return enableDeduplication, nil
204204
}
205205

206-
func (api *API) parseDownsamplingParam(r *http.Request, step time.Duration) (maxSourceResolution time.Duration, _ *ApiError) {
206+
func (api *API) parseDownsamplingParamMillis(r *http.Request, step time.Duration) (maxResolutionMillis int64, _ *ApiError) {
207207
const maxSourceResolutionParam = "max_source_resolution"
208-
maxSourceResolution = 0 * time.Second
208+
maxSourceResolution := 0 * time.Second
209209

210210
if api.enableAutodownsampling {
211211
// If no max_source_resolution is specified fit at least 5 samples between steps.
@@ -223,7 +223,7 @@ func (api *API) parseDownsamplingParam(r *http.Request, step time.Duration) (max
223223
return 0, &ApiError{errorBadData, errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)}
224224
}
225225

226-
return maxSourceResolution, nil
226+
return int64(maxSourceResolution / time.Millisecond), nil
227227
}
228228

229229
func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialResponse bool, _ *ApiError) {
@@ -366,7 +366,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
366366
return nil, nil, apiErr
367367
}
368368

369-
maxSourceResolution, apiErr := api.parseDownsamplingParam(r, step)
369+
maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step)
370370
if apiErr != nil {
371371
return nil, nil, apiErr
372372
}

pkg/query/api/v1_test.go

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"time"
3131

3232
"github.com/go-kit/kit/log"
33+
"github.com/improbable-eng/thanos/pkg/compact"
3334
"github.com/improbable-eng/thanos/pkg/query"
3435
"github.com/improbable-eng/thanos/pkg/testutil"
3536
opentracing "github.com/opentracing/opentracing-go"
@@ -42,7 +43,7 @@ import (
4243
)
4344

4445
func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator {
45-
return func(_ bool, _ time.Duration, _ bool, _ query.WarningReporter) storage.Queryable {
46+
return func(_ bool, _ int64, _ bool, _ query.WarningReporter) storage.Queryable {
4647
return queryable
4748
}
4849
}
@@ -833,3 +834,71 @@ func BenchmarkQueryResultEncoding(b *testing.B) {
833834
testutil.Ok(b, err)
834835
fmt.Println(len(c))
835836
}
837+
838+
func TestParseDownsamplingParamMillis(t *testing.T) {
839+
var tests = []struct {
840+
maxSourceResolutionParam string
841+
result int64
842+
step time.Duration
843+
fail bool
844+
enableAutodownsampling bool
845+
}{
846+
{
847+
maxSourceResolutionParam: "0s",
848+
enableAutodownsampling: false,
849+
step: time.Hour,
850+
result: int64(compact.ResolutionLevelRaw),
851+
fail: false,
852+
},
853+
{
854+
maxSourceResolutionParam: "5m",
855+
step: time.Hour,
856+
enableAutodownsampling: false,
857+
result: int64(compact.ResolutionLevel5m),
858+
fail: false,
859+
},
860+
{
861+
maxSourceResolutionParam: "1h",
862+
step: time.Hour,
863+
enableAutodownsampling: false,
864+
result: int64(compact.ResolutionLevel1h),
865+
fail: false,
866+
},
867+
{
868+
maxSourceResolutionParam: "",
869+
enableAutodownsampling: true,
870+
step: time.Hour,
871+
result: int64(time.Hour / (5 * 1000 * 1000)),
872+
fail: false,
873+
},
874+
{
875+
maxSourceResolutionParam: "",
876+
enableAutodownsampling: true,
877+
step: time.Hour,
878+
result: int64((1 * time.Hour) / 6),
879+
fail: true,
880+
},
881+
{
882+
maxSourceResolutionParam: "",
883+
enableAutodownsampling: true,
884+
step: time.Hour,
885+
result: int64((1 * time.Hour) / 6),
886+
fail: true,
887+
},
888+
}
889+
890+
for i, test := range tests {
891+
api := API{enableAutodownsampling: test.enableAutodownsampling}
892+
v := url.Values{}
893+
v.Set("max_source_resolution", test.maxSourceResolutionParam)
894+
r := http.Request{PostForm: v}
895+
896+
maxResMillis, _ := api.parseDownsamplingParamMillis(&r, test.step)
897+
if test.fail == false {
898+
testutil.Assert(t, maxResMillis == test.result, "case %v: expected %v to be equal to %v", i, maxResMillis, test.result)
899+
} else {
900+
testutil.Assert(t, maxResMillis != test.result, "case %v: expected %v not to be equal to %v", i, maxResMillis, test.result)
901+
}
902+
903+
}
904+
}

pkg/query/querier.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"sort"
66
"strings"
77

8-
"time"
9-
108
"github.com/go-kit/kit/log"
119
"github.com/improbable-eng/thanos/pkg/store/storepb"
1210
"github.com/improbable-eng/thanos/pkg/tracing"
@@ -24,19 +22,19 @@ type WarningReporter func(error)
2422

2523
// QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints.
2624
// If deduplication is enabled, all data retrieved from it will be deduplicated along the replicaLabel by default.
27-
// maxSourceResolution controls downsampling resolution that is allowed.
25+
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
2826
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy.
29-
type QueryableCreator func(deduplicate bool, maxSourceResolution time.Duration, partialResponse bool, r WarningReporter) storage.Queryable
27+
type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable
3028

3129
// NewQueryableCreator creates QueryableCreator.
3230
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabel string) QueryableCreator {
33-
return func(deduplicate bool, maxSourceResolution time.Duration, partialResponse bool, r WarningReporter) storage.Queryable {
31+
return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable {
3432
return &queryable{
3533
logger: logger,
3634
replicaLabel: replicaLabel,
3735
proxy: proxy,
3836
deduplicate: deduplicate,
39-
maxSourceResolution: maxSourceResolution,
37+
maxResolutionMillis: maxResolutionMillis,
4038
partialResponse: partialResponse,
4139
warningReporter: r,
4240
}
@@ -48,14 +46,14 @@ type queryable struct {
4846
replicaLabel string
4947
proxy storepb.StoreServer
5048
deduplicate bool
51-
maxSourceResolution time.Duration
49+
maxResolutionMillis int64
5250
partialResponse bool
5351
warningReporter WarningReporter
5452
}
5553

5654
// Querier returns a new storage querier against the underlying proxy store API.
5755
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
58-
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxSourceResolution/time.Millisecond), q.partialResponse, q.warningReporter), nil
56+
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.warningReporter), nil
5957
}
6058

6159
type querier struct {
@@ -66,7 +64,7 @@ type querier struct {
6664
replicaLabel string
6765
proxy storepb.StoreServer
6866
deduplicate bool
69-
maxSourceResolution int64
67+
maxResolutionMillis int64
7068
partialResponse bool
7169
warningReporter WarningReporter
7270
}
@@ -80,7 +78,7 @@ func newQuerier(
8078
replicaLabel string,
8179
proxy storepb.StoreServer,
8280
deduplicate bool,
83-
maxSourceResolution int64,
81+
maxResolutionMillis int64,
8482
partialResponse bool,
8583
warningReporter WarningReporter,
8684
) *querier {
@@ -100,7 +98,7 @@ func newQuerier(
10098
replicaLabel: replicaLabel,
10199
proxy: proxy,
102100
deduplicate: deduplicate,
103-
maxSourceResolution: maxSourceResolution,
101+
maxResolutionMillis: maxResolutionMillis,
104102
partialResponse: partialResponse,
105103
warningReporter: warningReporter,
106104
}
@@ -185,7 +183,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
185183
MinTime: q.mint,
186184
MaxTime: q.maxt,
187185
Matchers: sms,
188-
MaxResolutionWindow: q.maxSourceResolution,
186+
MaxResolutionWindow: q.maxResolutionMillis,
189187
Aggregates: queryAggrs,
190188
PartialResponseDisabled: !q.partialResponse,
191189
}, resp); err != nil {

pkg/query/querier_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,25 @@ import (
1919
"github.com/prometheus/tsdb/chunkenc"
2020
)
2121

22+
func TestQueryableCreator_MaxResolution(t *testing.T) {
23+
defer leaktest.CheckTimeout(t, 10*time.Second)()
24+
testProxy := &storeServer{resps: []*storepb.SeriesResponse{}}
25+
queryableCreator := NewQueryableCreator(nil, testProxy, "test")
26+
27+
oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
28+
queryable := queryableCreator(false, oneHourMillis, false, func(err error) {})
29+
30+
q, err := queryable.Querier(context.Background(), 0, 42)
31+
testutil.Ok(t, err)
32+
defer func() { testutil.Ok(t, q.Close()) }()
33+
34+
querierActual, ok := q.(*querier)
35+
36+
testutil.Assert(t, ok == true, "expected it to be a querier")
37+
testutil.Assert(t, querierActual.maxResolutionMillis == oneHourMillis, "expected max source resolution to be 1 hour in milliseconds")
38+
39+
}
40+
2241
func TestQuerier_Series(t *testing.T) {
2342
defer leaktest.CheckTimeout(t, 10*time.Second)()
2443

pkg/store/bucket.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
675675
// labels and resolution. This is important because we allow mixed resolution results, so it is quite crucial
676676
// to be aware what exactly resolution we see on query.
677677
// TODO(bplotka): Consider adding resolution label to all results to propagate that info to UI and Query API.
678-
func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels.Labels, bs []*bucketBlock) {
678+
func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMillis int64, lset labels.Labels, bs []*bucketBlock) {
679679
if len(bs) == 0 {
680680
level.Debug(logger).Log("msg", "No block found", "mint", mint, "maxt", maxt, "lset", lset.String())
681681
return
@@ -703,7 +703,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels
703703

704704
parts = append(parts, fmt.Sprintf("Range: %d-%d Resolution: %d", currMin, currMax, currRes))
705705

706-
level.Debug(logger).Log("msg", "Blocks source resolutions", "blocks", len(bs), "mint", mint, "maxt", maxt, "lset", lset.String(), "spans", strings.Join(parts, "\n"))
706+
level.Debug(logger).Log("msg", "Blocks source resolutions", "blocks", len(bs), "Maximum Resolution", maxResolutionMillis, "mint", mint, "maxt", maxt, "lset", lset.String(), "spans", strings.Join(parts, "\n"))
707707
}
708708

709709
// Series implements the storepb.StoreServer interface.
@@ -738,7 +738,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
738738
blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow)
739739

740740
if s.debugLogging {
741-
debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, bs.labels, blocks)
741+
debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks)
742742
}
743743

744744
for _, b := range blocks {
@@ -934,7 +934,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
934934
type bucketBlockSet struct {
935935
labels labels.Labels
936936
mtx sync.RWMutex
937-
resolutions []int64 // available resolution, high to low
937+
resolutions []int64 // available resolution, high to low (in milliseconds)
938938
blocks [][]*bucketBlock // ordered buckets for the existing resolutions
939939
}
940940

@@ -996,8 +996,8 @@ func int64index(s []int64, x int64) int {
996996
}
997997

998998
// getFor returns a time-ordered list of blocks that cover date between mint and maxt.
999-
// Blocks with the lowest resolution possible but not lower than the given resolution are returned.
1000-
func (s *bucketBlockSet) getFor(mint, maxt, minResolution int64) (bs []*bucketBlock) {
999+
// Blocks with the biggest resolution possible but not bigger than the given max resolution are returned.
1000+
func (s *bucketBlockSet) getFor(mint, maxt, maxResolutionMillis int64) (bs []*bucketBlock) {
10011001
if mint == maxt {
10021002
return nil
10031003
}
@@ -1007,33 +1007,31 @@ func (s *bucketBlockSet) getFor(mint, maxt, minResolution int64) (bs []*bucketBl
10071007

10081008
// Find first matching resolution.
10091009
i := 0
1010-
for ; i < len(s.resolutions) && s.resolutions[i] > minResolution; i++ {
1010+
for ; i < len(s.resolutions) && s.resolutions[i] > maxResolutionMillis; i++ {
10111011
}
10121012

1013-
// Base case, we fill the given interval with the closest resolution.
1013+
// Fill the given interval with the blocks for the current resolution.
1014+
// Our current resolution might not cover all data, so recursively fill the gaps with higher resolution blocks if there is any.
1015+
start := mint
10141016
for _, b := range s.blocks[i] {
10151017
if b.meta.MaxTime <= mint {
10161018
continue
10171019
}
10181020
if b.meta.MinTime >= maxt {
10191021
break
10201022
}
1023+
1024+
if i+1 < len(s.resolutions) {
1025+
bs = append(bs, s.getFor(start, b.meta.MinTime, s.resolutions[i+1])...)
1026+
}
10211027
bs = append(bs, b)
1028+
start = b.meta.MaxTime
10221029
}
1023-
// Our current resolution might not cover all data, recursively fill the gaps at the start
1024-
// and end of [mint, maxt] with higher resolution blocks.
1025-
i++
1026-
// No higher resolution left, we are done.
1027-
if i >= len(s.resolutions) {
1028-
return bs
1029-
}
1030-
if len(bs) == 0 {
1031-
return s.getFor(mint, maxt, s.resolutions[i])
1032-
}
1033-
left := s.getFor(mint, bs[0].meta.MinTime, s.resolutions[i])
1034-
right := s.getFor(bs[len(bs)-1].meta.MaxTime, maxt, s.resolutions[i])
10351030

1036-
return append(left, append(bs, right...)...)
1031+
if i+1 < len(s.resolutions) {
1032+
bs = append(bs, s.getFor(start, maxt, s.resolutions[i+1])...)
1033+
}
1034+
return bs
10371035
}
10381036

10391037
// labelMatchers verifies whether the block set matches the given matchers and returns a new

0 commit comments

Comments
 (0)