Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: bucket sorting issue when a sub-aggregation contains a `date_his…
…togram`
  • Loading branch information
silenceqi committed Jan 7, 2025
commit afe6313310ae065dab92ec477aaadf95f337aabe
6 changes: 1 addition & 5 deletions model/insight/metric_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,8 @@ func (m *Metric) ValidateSortKey() error {
if !util.StringInArray([]string{"desc", "asc"}, sortItem.Direction){
return fmt.Errorf("unknown sort direction [%s]", sortItem.Direction)
}
if v, ok := mm[sortItem.Key]; !ok && !util.StringInArray([]string{"_key", "_count"}, sortItem.Key){
if _, ok := mm[sortItem.Key]; !ok && !util.StringInArray([]string{"_key", "_count"}, sortItem.Key){
return fmt.Errorf("unknown sort key [%s]", sortItem.Key)
}else{
if v != nil && v.Statistic == "derivative" {
return fmt.Errorf("can not sort by pipeline agg [%s]", v.Statistic)
}
}
}
return nil
Expand Down
25 changes: 16 additions & 9 deletions plugin/api/insight/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
}
}
timeBeforeGroup := metric.AutoTimeBeforeGroup()
metricData := CollectMetricData(agg, timeBeforeGroup)
metricData, interval := CollectMetricData(agg, timeBeforeGroup)
formula := strings.TrimSpace(metric.Formula)
//support older versions for a single formula.
if metric.Formula != "" && len(metric.Formulas) == 0 {
Expand All @@ -278,12 +278,18 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
targetMetricData = metricData
} else {
params := map[string]interface{}{}
if metric.BucketSize != "" && metric.BucketSize != "auto" {
du, err := util.ParseDuration(metric.BucketSize)
if err != nil {
return nil, err
if metric.BucketSize != "" {
bucketSize := metric.BucketSize
if metric.BucketSize == "auto" && interval != "" {
bucketSize = interval
}
if interval != "" || bucketSize != "auto" {
du, err := util.ParseDuration(bucketSize)
if err != nil {
return nil, err
}
params["bucket_size_in_second"] = du.Seconds()
}
params["bucket_size_in_second"] = du.Seconds()
}
for _, md := range metricData {
targetData := insight.MetricData{
Expand All @@ -301,8 +307,8 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
if err != nil {
return nil, err
}
formula = msgBuffer.String()
expression, err := govaluate.NewEvaluableExpression(formula)
resolvedFormula := msgBuffer.String()
expression, err := govaluate.NewEvaluableExpression(resolvedFormula)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -336,7 +342,8 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
}
result, err := expression.Evaluate(parameters)
if err != nil {
return nil, err
log.Debugf("evaluate formula error: %v", err)
continue
}
if r, ok := result.(float64); ok {
if math.IsNaN(r) || math.IsInf(r, 0) {
Expand Down
53 changes: 37 additions & 16 deletions plugin/api/insight/metric_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,27 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
"size": limit,
}
if i == grpLength - 1 && len(metric.Sort) > 0 {
var termsOrder []interface{}
for _, sortItem := range metric.Sort {
termsOrder = append(termsOrder, util.MapStr{sortItem.Key: sortItem.Direction})
//use bucket sort instead of terms order when time after group
if !timeBeforeGroup && len(metric.Sort) > 0 {
basicAggs["sort_field"] = util.MapStr{
"max_bucket": util.MapStr{
"buckets_path": fmt.Sprintf("time_buckets>%s", metric.Sort[0].Key),
},
}
basicAggs["bucket_sorter"] = util.MapStr{
"bucket_sort": util.MapStr{
"sort": []util.MapStr{
{"sort_field": util.MapStr{"order": metric.Sort[0].Direction}},
},
},
}
}else{
var termsOrder []interface{}
for _, sortItem := range metric.Sort {
termsOrder = append(termsOrder, util.MapStr{sortItem.Key: sortItem.Direction})
}
termsCfg["order"] = termsOrder
}
termsCfg["order"] = termsOrder
}
groupAgg := util.MapStr{
"terms": termsCfg,
Expand Down Expand Up @@ -245,20 +261,22 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
return queryDsl, nil
}

func CollectMetricData(agg interface{}, timeBeforeGroup bool) []insight.MetricData {
func CollectMetricData(agg interface{}, timeBeforeGroup bool) ([]insight.MetricData, string) {
metricData := []insight.MetricData{}
var interval string
if timeBeforeGroup {
collectMetricDataOther(agg, nil, &metricData, nil)
interval = collectMetricDataOther(agg, nil, &metricData, nil)
} else {
collectMetricData(agg, nil, &metricData)
interval = collectMetricData(agg, nil, &metricData)
}
return metricData
return metricData, interval
}

// timeBeforeGroup => false
func collectMetricData(agg interface{}, groupValues []string, metricData *[]insight.MetricData) {
func collectMetricData(agg interface{}, groupValues []string, metricData *[]insight.MetricData) (interval string){
if aggM, ok := agg.(map[string]interface{}); ok {
if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok {
interval, _ = timeBks["interval"].(string)
if bks, ok := timeBks["buckets"].([]interface{}); ok {
md := insight.MetricData{
Data: map[string][]insight.MetricDataItem{},
Expand All @@ -272,7 +290,7 @@ func collectMetricData(agg interface{}, groupValues []string, metricData *[]insi
continue
}

if vm, ok := v.(map[string]interface{}); ok && len(k) < 5 {
if vm, ok := v.(map[string]interface{}); ok && len(k) < 20 {
collectMetricDataItem(k, vm, &md, bkM["key"])
}

Expand Down Expand Up @@ -300,12 +318,12 @@ func collectMetricData(agg interface{}, groupValues []string, metricData *[]insi
newGroupValues := make([]string, 0, len(groupValues)+1)
newGroupValues = append(newGroupValues, groupValues...)
newGroupValues = append(newGroupValues, currentGroup)
collectMetricData(bk, newGroupValues, metricData)
interval = collectMetricData(bk, newGroupValues, metricData)
}
}
} else {
//non time series metric data
if len(k) < 5 {
if len(k) < 20 {
collectMetricDataItem(k, vm, &md, nil)
}
}
Expand All @@ -316,12 +334,14 @@ func collectMetricData(agg interface{}, groupValues []string, metricData *[]insi
}
}
}
return
}

// timeBeforeGroup => true
func collectMetricDataOther(agg interface{}, groupValues []string, metricData *[]insight.MetricData, timeKey interface{}) {
func collectMetricDataOther(agg interface{}, groupValues []string, metricData *[]insight.MetricData, timeKey interface{}) (interval string){
if aggM, ok := agg.(map[string]interface{}); ok {
if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok {
interval, _ = timeBks["interval"].(string)
if bks, ok := timeBks["buckets"].([]interface{}); ok {
md := insight.MetricData{
Data: map[string][]insight.MetricDataItem{},
Expand All @@ -335,7 +355,7 @@ func collectMetricDataOther(agg interface{}, groupValues []string, metricData *[
}
if vm, ok := v.(map[string]interface{}); ok {
if vm["buckets"] != nil {
collectMetricDataOther(vm, groupValues, metricData, bkM["key"])
interval = collectMetricDataOther(vm, groupValues, metricData, bkM["key"])
} else {
collectMetricDataItem(k, vm, &md, bkM["key"])
}
Expand Down Expand Up @@ -363,15 +383,15 @@ func collectMetricDataOther(agg interface{}, groupValues []string, metricData *[
newGroupValues := make([]string, 0, len(groupValues)+1)
newGroupValues = append(newGroupValues, groupValues...)
newGroupValues = append(newGroupValues, currentGroup)
collectMetricDataOther(bk, newGroupValues, metricData, timeKey)
interval = collectMetricDataOther(bk, newGroupValues, metricData, timeKey)
}
}
} else {
//non time series metric data
for k, v := range aggM {
if vm, ok := v.(map[string]interface{}); ok {
if vm["buckets"] != nil {
collectMetricDataOther(vm, groupValues, metricData, timeKey)
interval = collectMetricDataOther(vm, groupValues, metricData, timeKey)
} else {
collectMetricDataItem(k, vm, &md, timeKey)
}
Expand All @@ -384,6 +404,7 @@ func collectMetricDataOther(agg interface{}, groupValues []string, metricData *[
}
}
}
return
}

func collectMetricDataItem(key string, vm map[string]interface{}, metricData *insight.MetricData, timeKey interface{}) {
Expand Down