Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Test transforms
  • Loading branch information
jsoriano committed Nov 19, 2024
commit 559e15faeafb69ca65967878b4560e374eb561bf
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/creack/pty v1.1.19
github.com/dustin/go-humanize v1.0.1
github.com/elastic/elastic-integration-corpus-generator-tool v0.10.0
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/elastic/go-elasticsearch/v8 v8.16.0
github.com/elastic/go-licenser v0.4.2
github.com/elastic/go-resource v0.2.0
github.com/elastic/go-ucfg v0.8.8
Expand Down Expand Up @@ -72,6 +72,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/elastic/gojsonschema v1.2.1 // indirect
github.com/elastic/kbncontent v0.1.4 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
Expand All @@ -84,6 +85,7 @@ require (
github.com/go-errors/errors v1.4.2 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/errors v0.20.3 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
Expand Down Expand Up @@ -161,6 +163,9 @@ require (
github.com/xlab/treeprint v1.2.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.mongodb.org/mongo-driver v1.11.1 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/exp/typeparams v0.0.0-20231108232855-2478ac86f678 // indirect
Expand Down
17 changes: 15 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/elastic/elastic-integration-corpus-generator-tool v0.10.0 h1:sx1lpZuTG5suJuvgix4FWQFCLFFbzkoOmPoHWYOPLCY=
github.com/elastic/elastic-integration-corpus-generator-tool v0.10.0/go.mod h1:2/30n+2QRzRzus4TPVUV1T3U/j8g2ItUgvP0pcpjLGk=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v8 v8.16.0 h1:f7bR+iBz8GTAVhwyFO3hm4ixsz2eMaEy0QroYnXV3jE=
github.com/elastic/go-elasticsearch/v8 v8.16.0/go.mod h1:lGMlgKIbYoRvay3xWBeKahAiJOgmFDsjZC39nmO3H64=
github.com/elastic/go-licenser v0.4.2 h1:bPbGm8bUd8rxzSswFOqvQh1dAkKGkgAmrPxbUi+Y9+A=
github.com/elastic/go-licenser v0.4.2/go.mod h1:W8eH6FaZDR8fQGm+7FnVa7MxI1b/6dAqxz+zPB8nm5c=
github.com/elastic/go-resource v0.2.0 h1:T92tw+THqISnCKaZBijlZMpEpCYkFkwsOgLQxKX6pqA=
Expand Down Expand Up @@ -138,8 +140,11 @@ github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMj
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII=
github.com/go-git/go-git/v5 v5.12.0 h1:7Md+ndsjrzZxbddRDZjF14qK+NN56sy6wkqaVrjZtys=
github.com/go-git/go-git/v5 v5.12.0/go.mod h1:FTM9VKtnI2m65hNI/TenDDDnUf2Q9FHnXYjuz9i5OEY=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/errors v0.20.2/go.mod h1:cM//ZKUKyO06HSwqAelJ5NsEMMcpa6VpXe8DOa1Mi1M=
Expand Down Expand Up @@ -427,6 +432,14 @@ github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ
go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8=
go.mongodb.org/mongo-driver v1.11.1 h1:QP0znIRTuL0jf1oBQoAoM0C6ZJfBK4kx0Uumtv1A7w8=
go.mongodb.org/mongo-driver v1.11.1/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca h1:VdD38733bfYv5tUZwEIskMM93VanwNIi5bIKnDrJdEY=
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
4 changes: 2 additions & 2 deletions internal/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"net/http"
"strings"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"

"github.com/elastic/elastic-package/internal/certs"
)
Expand Down
161 changes: 161 additions & 0 deletions internal/testrunner/runners/system/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1896,6 +1896,12 @@ func (r *tester) checkTransforms(ctx context.Context, config *testConfig, pkgMan
return fmt.Errorf("no documents found in preview for transform %q", transformId)
}

// Check that there is no problem running the actual transform.
err = r.checkRunningTransformHealth(ctx, transformId)
if err != nil {
return fmt.Errorf("there are issues with installed transform %q: %w", transformId, err)
}

transformRootPath := filepath.Dir(transform.Path)
fieldsValidator, err := fields.CreateValidatorForDirectory(transformRootPath,
fields.WithSpecVersion(pkgManifest.SpecVersion),
Expand Down Expand Up @@ -1978,6 +1984,161 @@ func (r *tester) previewTransform(ctx context.Context, transformId string) ([]co
return preview.Documents, nil
}

func (r *tester) scheduleTransform(ctx context.Context, transformId string) error {
resp, err := r.esAPI.TransformScheduleNowTransform(transformId,
r.esAPI.TransformScheduleNowTransform.WithContext(ctx),
)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.IsError() {
return fmt.Errorf("failed to schedule transform %q: %s", transformId, resp.String())
}

return nil
}

type transformStats struct {
Checkpointing struct {
Last struct {
Checkpoint int `json:"checkpoint"`
} `json:"last"`
Next struct {
Checkpoint int `json:"checkpoint"`
} `json:"next"`
LastSearchTime int `json:"last_search_time"`
} `json:"checkpointing"`
Health transformHealth `json:"health"`
Reason string `json:"reason"`
State string `json:"state"`
}

type transformHealth struct {
Status string `json:"status"`
Issues []struct {
Issue string `json:"issue"`
Details string `json:"details"`
Count int `json:"count"`
FirstOccurrence time.Time `json:"first_occurrence"`
} `json:"issues"`
}

func (r *tester) getTransformStats(ctx context.Context, transformId string) (*transformStats, error) {
resp, err := r.esAPI.TransformGetTransformStats(transformId,
r.esAPI.TransformGetTransformStats.WithContext(ctx),
r.esAPI.TransformGetTransformStats.WithAllowNoMatch(false),
)

if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.IsError() {
return nil, fmt.Errorf("failed to get transform stats for %q: %s", transformId, resp.String())
}

var response struct {
Transforms []transformStats `json:"transforms"`
}
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
if len(response.Transforms) != 1 {
return nil, fmt.Errorf("stats for %d transforms received when requesting only for %s", len(response.Transforms), transformId)
}
return &response.Transforms[0], nil
}

// checkRunningTransformHealth checks the following for a given transform:
// - That it is started.
// - That it can execute at least once during the check.
// - That it is healthy after executing at least once.
func (r *tester) checkRunningTransformHealth(ctx context.Context, transformId string) error {
const (
period = 1 * time.Second
timeout = 60 * time.Second
)
lastSearchTime := 0
last := -1
running := false
ok, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
stats, err := r.getTransformStats(ctx, transformId)
if err != nil {
return false, err
}
if last < 0 {
last = stats.Checkpointing.Last.Checkpoint
lastSearchTime = stats.Checkpointing.LastSearchTime
}
logger.Debugf("transform %s state: %s, health: %s, checkpoint %d, last search %d",
transformId,
stats.State, stats.Health.Status,
stats.Checkpointing.Last.Checkpoint, stats.Checkpointing.LastSearchTime)
switch stats.State {
case "failed":
return false, fmt.Errorf("transform in failed state: %s", stats.Reason)
case "aborting", "stopping", "stopped":
return false, fmt.Errorf("transform unexpectedly %s", stats.State)
case "indexing":
// It is already running, wait till indexing finishes.
running = true
return false, nil
case "started":
if !running {
logger.Debugf("scheduling transform %s now", transformId)
err := r.scheduleTransform(ctx, transformId)
if err != nil {
return false, fmt.Errorf("failed to schedule transform: %w", err)
}
running = true
return false, nil
}
default:
return false, fmt.Errorf("unexpected transform state %q", stats.State)
}

if stats.Checkpointing.Last.Checkpoint <= last && stats.Checkpointing.LastSearchTime <= lastSearchTime {
// There hasn't been any update yet, try again.
return false, nil
}

err = healthError(stats.Health)
return err == nil, err
}, period, timeout)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("could not confirm successful executions of transform %s", transformId)
}
return nil
}

func healthError(health transformHealth) error {
if health.Status == "green" {
return nil
}

var msg strings.Builder
msg.WriteString("unexpected transform health status (" + health.Status + ")")

if len(health.Issues) > 0 {
msg.WriteString(": ")
for i, issue := range health.Issues {
msg.WriteString(issue.Issue + "(" + issue.Details + ")")
if i+1 < len(health.Issues) {
msg.WriteString(", ")
}
}
}

return errors.New(msg.String())
}

func filterAgents(allAgents []kibana.Agent, svcInfo servicedeployer.ServiceInfo) []kibana.Agent {
if svcInfo.Agent.Host.NamePrefix != "" {
logger.Debugf("filter agents using criteria: NamePrefix=%s", svcInfo.Agent.Host.NamePrefix)
Expand Down