Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Interrupt loop waiting docs if there are docs in the failure store
  • Loading branch information
jsoriano committed Jul 15, 2024
commit 22fa9b7f2a36d33a102c275b5dbe337a7773e5e1
42 changes: 36 additions & 6 deletions internal/testrunner/runners/system/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,15 @@ func (r *tester) getFailureStoreDocs(ctx context.Context, dataStream string) ([]
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
switch {
case resp.StatusCode == http.StatusNotFound:
// Can happen if the data stream hasn't been created yet.
return nil, nil
case resp.StatusCode == http.StatusServiceUnavailable:
// Index is being created, but no shards are available yet.
// See https://github.com/elastic/elasticsearch/issues/65846
return nil, nil
case resp.StatusCode >= 400:
return nil, fmt.Errorf("search request returned status code %d", resp.StatusCode)
}

Expand Down Expand Up @@ -1133,6 +1141,18 @@ func (r *tester) prepareScenario(ctx context.Context, config *testConfig, svcInf
return false, err
}

if r.checkFailureStore {
failureStore, err := r.getFailureStoreDocs(ctx, scenario.dataStream)
if err != nil {
return false, fmt.Errorf("failed to check failure store: %w", err)
}
if n := len(failureStore); n > 0 {
// Interrupt loop earlier if there are failures in the document store.
logger.Debugf("Found %d hits in the failure store for %s", len(failureStore), scenario.dataStream)
return true, nil
}
}

if config.Assert.HitCount > 0 {
if hits.size() < config.Assert.HitCount {
return false, nil
Expand Down Expand Up @@ -1325,7 +1345,15 @@ func (r *tester) createServiceStateDir() error {
func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.ResultComposer, scenario *scenarioTest, config *testConfig) ([]testrunner.TestResult, error) {
if r.checkFailureStore && len(scenario.failureStore) > 0 {
// TODO: Report failures found.
return result.WithErrorf("there are %d documents in the failure store", len(scenario.failureStore))
for _, doc := range scenario.failureStore {
d, err := json.MarshalIndent(doc, "", " ")
if err != nil {
return result.WithErrorf("failed to encode document from the failure store: %w", err)
}
logger.Debugf("Document found in the failure store: %s", string(d))
}
results, _ := result.WithErrorf("there are %d documents in the failure store", len(scenario.failureStore))
return results, nil
}

// Validate fields in docs
Expand Down Expand Up @@ -1375,7 +1403,7 @@ func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.Re
fields.WithDisableNormalization(scenario.syntheticEnabled),
)
if err != nil {
return result.WithError(fmt.Errorf("creating fields validator for data stream failed (path: %s): %w", r.dataStreamPath, err))
return result.WithErrorf("creating fields validator for data stream failed (path: %s): %w", r.dataStreamPath, err)
}
if err := validateFields(scenario.docs, fieldsValidator, scenario.dataStream); err != nil {
return result.WithError(err)
Expand All @@ -1390,13 +1418,14 @@ func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.Re
if scenario.syntheticEnabled {
docs, err = fieldsValidator.SanitizeSyntheticSourceDocs(scenario.docs)
if err != nil {
return result.WithError(fmt.Errorf("failed to sanitize synthetic source docs: %w", err))
results, _ := result.WithErrorf("failed to sanitize synthetic source docs: %w", err)
return results, nil
}
}

specVersion, err := semver.NewVersion(r.pkgManifest.SpecVersion)
if err != nil {
return result.WithError(fmt.Errorf("failed to parse format version %q: %w", r.pkgManifest.SpecVersion, err))
return result.WithErrorf("failed to parse format version %q: %w", r.pkgManifest.SpecVersion, err)
}

// Write sample events file from first doc, if requested
Expand All @@ -1411,7 +1440,8 @@ func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.Re

// Check transforms if present
if err := r.checkTransforms(ctx, config, r.pkgManifest, scenario.kibanaDataStream, scenario.dataStream); err != nil {
return result.WithError(err)
results, _ := result.WithError(err)
return results, nil
}

if scenario.agent != nil {
Expand Down