Skip to content

Commit 89d80a6

Browse files
authored
Feature/querysharding ii (#1927)
* [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
1 parent 156023a commit 89d80a6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2808
-377
lines changed

pkg/logcli/query/query.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,29 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
119119

120120
eng := logql.NewEngine(conf.Querier.Engine, querier)
121121
var query logql.Query
122+
122123
if q.isInstant() {
123-
query = eng.NewInstantQuery(q.QueryString, q.Start, q.resultsDirection(), uint32(q.Limit))
124+
query = eng.Query(logql.NewLiteralParams(
125+
q.QueryString,
126+
q.Start,
127+
q.Start,
128+
0,
129+
0,
130+
q.resultsDirection(),
131+
uint32(q.Limit),
132+
nil,
133+
))
124134
} else {
125-
query = eng.NewRangeQuery(q.QueryString, q.Start, q.End, q.Step, q.Interval, q.resultsDirection(), uint32(q.Limit))
135+
query = eng.Query(logql.NewLiteralParams(
136+
q.QueryString,
137+
q.Start,
138+
q.End,
139+
q.Step,
140+
q.Interval,
141+
q.resultsDirection(),
142+
uint32(q.Limit),
143+
nil,
144+
))
126145
}
127146

128147
// execute the query

pkg/loghttp/params.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func direction(r *http.Request) (logproto.Direction, error) {
4444
return parseDirection(r.Form.Get("direction"), logproto.BACKWARD)
4545
}
4646

47+
func shards(r *http.Request) []string {
48+
return r.Form["shards"]
49+
}
50+
4751
func bounds(r *http.Request) (time.Time, time.Time, error) {
4852
now := time.Now()
4953
start, err := parseTimestamp(r.Form.Get("start"), now.Add(-defaultSince))

pkg/loghttp/query.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ type RangeQuery struct {
244244
Query string
245245
Direction logproto.Direction
246246
Limit uint32
247+
Shards []string
247248
}
248249

249250
// ParseRangeQuery parses a RangeQuery request from an http request.
@@ -280,6 +281,8 @@ func ParseRangeQuery(r *http.Request) (*RangeQuery, error) {
280281
return nil, errNegativeStep
281282
}
282283

284+
result.Shards = shards(r)
285+
283286
// For safety, limit the number of returned points per timeseries.
284287
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
285288
if (result.End.Sub(result.Start) / result.Step) > 11000 {

pkg/logproto/extensions.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package logproto
22

33
import "github.com/prometheus/prometheus/pkg/labels"
44

5+
// Note, this is not very efficient and use should be minimized as it requires label construction on each comparison
56
type SeriesIdentifiers []SeriesIdentifier
67

78
func (ids SeriesIdentifiers) Len() int { return len(ids) }
@@ -10,3 +11,9 @@ func (ids SeriesIdentifiers) Less(i, j int) bool {
1011
a, b := labels.FromMap(ids[i].Labels), labels.FromMap(ids[j].Labels)
1112
return labels.Compare(a, b) <= 0
1213
}
14+
15+
type Streams []Stream
16+
17+
func (xs Streams) Len() int { return len(xs) }
18+
func (xs Streams) Swap(i, j int) { xs[i], xs[j] = xs[j], xs[i] }
19+
func (xs Streams) Less(i, j int) bool { return xs[i].Labels <= xs[j].Labels }

pkg/logproto/logproto.pb.go

Lines changed: 147 additions & 74 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/logproto/logproto.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ message QueryRequest {
3737
google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
3838
Direction direction = 5;
3939
reserved 6;
40+
repeated string shards = 7 [(gogoproto.jsontag) = "shards,omitempty"];
41+
4042
}
4143

4244
enum Direction {

pkg/logql/astmapper.go

Lines changed: 0 additions & 26 deletions
This file was deleted.

pkg/logql/engine.go

Lines changed: 54 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package logql
22

33
import (
44
"context"
5+
"errors"
56
"sort"
67
"time"
78

@@ -35,6 +36,12 @@ const ValueTypeStreams = "streams"
3536
// Streams is promql.Value
3637
type Streams []logproto.Stream
3738

39+
func (streams Streams) Len() int { return len(streams) }
40+
func (streams Streams) Swap(i, j int) { streams[i], streams[j] = streams[j], streams[i] }
41+
func (streams Streams) Less(i, j int) bool {
42+
return streams[i].Labels <= streams[j].Labels
43+
}
44+
3845
// Type implements `promql.Value`
3946
func (Streams) Type() promql.ValueType { return ValueTypeStreams }
4047

@@ -67,31 +74,29 @@ func (opts *EngineOpts) applyDefault() {
6774
}
6875
}
6976

70-
// Engine interface used to construct queries
71-
type Engine interface {
72-
NewRangeQuery(qs string, start, end time.Time, step, interval time.Duration, direction logproto.Direction, limit uint32) Query
73-
NewInstantQuery(qs string, ts time.Time, direction logproto.Direction, limit uint32) Query
74-
}
75-
76-
// engine is the LogQL engine.
77-
type engine struct {
77+
// Engine is the LogQL engine.
78+
type Engine struct {
7879
timeout time.Duration
7980
evaluator Evaluator
8081
}
8182

82-
// NewEngine creates a new LogQL engine.
83-
func NewEngine(opts EngineOpts, q Querier) Engine {
84-
if q == nil {
85-
panic("nil Querier")
86-
}
87-
83+
// NewEngine creates a new LogQL Engine.
84+
func NewEngine(opts EngineOpts, q Querier) *Engine {
8885
opts.applyDefault()
86+
return &Engine{
87+
timeout: opts.Timeout,
88+
evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
89+
}
90+
}
8991

90-
return &engine{
91-
timeout: opts.Timeout,
92-
evaluator: &defaultEvaluator{
93-
querier: q,
94-
maxLookBackPeriod: opts.MaxLookBackPeriod,
92+
// Query creates a new LogQL query. Instant/Range type is derived from the parameters.
93+
func (ng *Engine) Query(params Params) Query {
94+
return &query{
95+
timeout: ng.timeout,
96+
params: params,
97+
evaluator: ng.evaluator,
98+
parse: func(_ context.Context, query string) (Expr, error) {
99+
return ParseExpr(query)
95100
},
96101
}
97102
}
@@ -103,17 +108,18 @@ type Query interface {
103108
}
104109

105110
type query struct {
106-
LiteralParams
107-
108-
ng *engine
111+
timeout time.Duration
112+
params Params
113+
parse func(context.Context, string) (Expr, error)
114+
evaluator Evaluator
109115
}
110116

111-
// Exec Implements `Query`
117+
// Exec Implements `Query`. It handles instrumentation & defers to Eval.
112118
func (q *query) Exec(ctx context.Context) (Result, error) {
113-
log, ctx := spanlogger.New(ctx, "Engine.Exec")
119+
log, ctx := spanlogger.New(ctx, "query.Exec")
114120
defer log.Finish()
115121

116-
rangeType := GetRangeType(q)
122+
rangeType := GetRangeType(q.params)
117123
timer := prometheus.NewTimer(queryTime.WithLabelValues(string(rangeType)))
118124
defer timer.ObserveDuration()
119125

@@ -122,7 +128,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
122128
start := time.Now()
123129
ctx = stats.NewContext(ctx)
124130

125-
data, err := q.ng.exec(ctx, q)
131+
data, err := q.Eval(ctx)
126132

127133
statResult = stats.Snapshot(ctx, time.Since(start))
128134
statResult.Log(level.Debug(log))
@@ -134,88 +140,49 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
134140
status = "400"
135141
}
136142
}
137-
RecordMetrics(ctx, q, status, statResult)
143+
RecordMetrics(ctx, q.params, status, statResult)
138144

139145
return Result{
140146
Data: data,
141147
Statistics: statResult,
142148
}, err
143149
}
144150

145-
// NewRangeQuery creates a new LogQL range query.
146-
func (ng *engine) NewRangeQuery(
147-
qs string,
148-
start, end time.Time, step time.Duration, interval time.Duration,
149-
direction logproto.Direction, limit uint32) Query {
150-
return &query{
151-
LiteralParams: LiteralParams{
152-
qs: qs,
153-
start: start,
154-
end: end,
155-
step: step,
156-
interval: interval,
157-
direction: direction,
158-
limit: limit,
159-
},
160-
ng: ng,
161-
}
162-
}
163-
164-
// NewInstantQuery creates a new LogQL instant query.
165-
func (ng *engine) NewInstantQuery(
166-
qs string,
167-
ts time.Time,
168-
direction logproto.Direction, limit uint32) Query {
169-
return &query{
170-
LiteralParams: LiteralParams{
171-
qs: qs,
172-
start: ts,
173-
end: ts,
174-
step: 0,
175-
interval: 0,
176-
direction: direction,
177-
limit: limit,
178-
},
179-
ng: ng,
180-
}
181-
}
182-
183-
func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
184-
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
151+
func (q *query) Eval(ctx context.Context) (promql.Value, error) {
152+
ctx, cancel := context.WithTimeout(ctx, q.timeout)
185153
defer cancel()
186154

187-
qs := q.Query()
188-
189-
expr, err := ParseExpr(qs)
155+
expr, err := q.parse(ctx, q.params.Query())
190156
if err != nil {
191157
return nil, err
192158
}
193159

194160
switch e := expr.(type) {
195161
case SampleExpr:
196-
value, err := ng.evalSample(ctx, e, q)
162+
value, err := q.evalSample(ctx, e)
197163
return value, err
198164

199165
case LogSelectorExpr:
200-
iter, err := ng.evaluator.Iterator(ctx, e, q)
166+
iter, err := q.evaluator.Iterator(ctx, e, q.params)
201167
if err != nil {
202168
return nil, err
203169
}
170+
204171
defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close)
205-
streams, err := readStreams(iter, q.limit, q.direction, q.interval)
172+
streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval())
206173
return streams, err
174+
default:
175+
return nil, errors.New("Unexpected type (%T): cannot evaluate")
207176
}
208-
209-
return nil, nil
210177
}
211178

212179
// evalSample evaluate a sampleExpr
213-
func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (promql.Value, error) {
180+
func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql.Value, error) {
214181
if lit, ok := expr.(*literalExpr); ok {
215-
return ng.evalLiteral(ctx, lit, q)
182+
return q.evalLiteral(ctx, lit)
216183
}
217184

218-
stepEvaluator, err := ng.evaluator.StepEvaluator(ctx, ng.evaluator, expr, q)
185+
stepEvaluator, err := q.evaluator.StepEvaluator(ctx, q.evaluator, expr, q.params)
219186
if err != nil {
220187
return nil, err
221188
}
@@ -224,7 +191,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
224191
seriesIndex := map[uint64]*promql.Series{}
225192

226193
next, ts, vec := stepEvaluator.Next()
227-
if GetRangeType(q) == InstantType {
194+
if GetRangeType(q.params) == InstantType {
228195
sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 })
229196
return vec, nil
230197
}
@@ -262,21 +229,21 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
262229
return result, nil
263230
}
264231

265-
func (ng *engine) evalLiteral(_ context.Context, expr *literalExpr, q *query) (promql.Value, error) {
232+
func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (promql.Value, error) {
266233
s := promql.Scalar{
267-
T: q.Start().UnixNano() / int64(time.Millisecond),
234+
T: q.params.Start().UnixNano() / int64(time.Millisecond),
268235
V: expr.value,
269236
}
270237

271-
if GetRangeType(q) == InstantType {
238+
if GetRangeType(q.params) == InstantType {
272239
return s, nil
273240
}
274241

275-
return PopulateMatrixFromScalar(s, q.LiteralParams), nil
242+
return PopulateMatrixFromScalar(s, q.params), nil
276243

277244
}
278245

279-
func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.Matrix {
246+
func PopulateMatrixFromScalar(data promql.Scalar, params Params) promql.Matrix {
280247
var (
281248
start = params.Start()
282249
end = params.End()
@@ -286,7 +253,7 @@ func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.M
286253
[]promql.Point,
287254
0,
288255
// allocate enough space for all needed entries
289-
int(params.End().Sub(params.Start())/params.Step())+1,
256+
int(end.Sub(start)/step)+1,
290257
),
291258
}
292259
)
@@ -329,10 +296,11 @@ func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, inte
329296
}
330297
}
331298

332-
result := make([]logproto.Stream, 0, len(streams))
299+
result := make(Streams, 0, len(streams))
333300
for _, stream := range streams {
334301
result = append(result, *stream)
335302
}
303+
sort.Sort(result)
336304
return result, i.Error()
337305
}
338306

0 commit comments

Comments
 (0)