Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/testrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,10 @@ func testRunnerSystemCommandAction(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
checkFailureStore, err := esClient.IsFailureStoreAvailable(ctx)
if err != nil {
return fmt.Errorf("can't check if failure store is available: %w", err)
}

if runTearDown || runTestsOnly {
if variantFlag != "" {
Expand All @@ -565,6 +569,7 @@ func testRunnerSystemCommandAction(cmd *cobra.Command, args []string) error {
PackageRootPath: packageRootPath,
KibanaClient: kibanaClient,
API: esClient.API,
ESClient: esClient,
ConfigFilePath: configFileFlag,
RunSetup: runSetup,
RunTearDown: runTearDown,
Expand All @@ -577,6 +582,7 @@ func testRunnerSystemCommandAction(cmd *cobra.Command, args []string) error {
GlobalTestConfig: globalTestConfig.System,
WithCoverage: testCoverage,
CoverageType: testCoverageFormat,
CheckFailureStore: checkFailureStore,
})

logger.Debugf("Running suite...")
Expand Down
115 changes: 5 additions & 110 deletions internal/dump/indextemplates.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,119 +6,14 @@ package dump

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"slices"

"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/elasticsearch/ingest"
)

// IndexTemplate contains information related to an index template for exporting purpouses.
// It contains a partially parsed index template and the original JSON from the response.
type IndexTemplate struct {
TemplateName string `json:"name"`
IndexTemplate struct {
Meta struct {
ManagedBy string `json:"managed_by"`
Managed bool `json:"managed"`
Package struct {
Name string `json:"name"`
} `json:"package"`
} `json:"_meta"`
ComposedOf []string `json:"composed_of"`
Template struct {
Settings TemplateSettings `json:"settings"`
} `json:"template"`
} `json:"index_template"`
type IndexTemplate = ingest.IndexTemplate
type TemplateSettings = ingest.TemplateSettings

raw json.RawMessage
}

// TemplateSettings are common settings to all kinds of templates.
type TemplateSettings struct {
Index struct {
DefaultPipeline string `json:"default_pipeline"`
FinalPipeline string `json:"final_pipeline"`
Lifecycle struct {
Name string `json:"name"`
} `json:"lifecycle"`
} `json:"index"`
}

// Name returns the name of the index template.
func (t IndexTemplate) Name() string {
return t.TemplateName
}

// JSON returns the JSON representation of the index template.
func (t IndexTemplate) JSON() []byte {
return []byte(t.raw)
}

// TemplateSettings returns the template settings of this template.
func (t IndexTemplate) TemplateSettings() TemplateSettings {
return t.IndexTemplate.Template.Settings
}

type getIndexTemplateResponse struct {
IndexTemplates []json.RawMessage `json:"index_templates"`
}

func getIndexTemplatesForPackage(ctx context.Context, api *elasticsearch.API, packageName string) ([]IndexTemplate, error) {
resp, err := api.Indices.GetIndexTemplate(
api.Indices.GetIndexTemplate.WithContext(ctx),

// Wildcard may be too wide, we will double check below if it is a managed template.
api.Indices.GetIndexTemplate.WithName(fmt.Sprintf("*-%s.*", packageName)),
)
if err != nil {
return nil, fmt.Errorf("failed to get index templates: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusNotFound {
// Some packages don't have index templates.
return nil, nil
}
if resp.IsError() {
return nil, fmt.Errorf("failed to get index templates: %s", resp.String())
}

d, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}

var templateResponse getIndexTemplateResponse
err = json.Unmarshal(d, &templateResponse)
if err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}

var indexTemplates []IndexTemplate
for _, indexTemplateRaw := range templateResponse.IndexTemplates {
var indexTemplate IndexTemplate
err = json.Unmarshal(indexTemplateRaw, &indexTemplate)
if err != nil {
return nil, fmt.Errorf("failed to parse index template: %w", err)
}
indexTemplate.raw = indexTemplateRaw

meta := indexTemplate.IndexTemplate.Meta
if meta.Package.Name != packageName || !managedByFleet(meta.ManagedBy) {
// This is not the droid you are looking for.
continue
}

indexTemplates = append(indexTemplates, indexTemplate)
}

return indexTemplates, nil
}

func managedByFleet(managedBy string) bool {
var managers = []string{"ingest-manager", "fleet"}
return slices.Contains(managers, managedBy)
func getIndexTemplatesForPackage(ctx context.Context, api *elasticsearch.API, packageName string) ([]ingest.IndexTemplate, error) {
return ingest.GetIndexTemplatesForPackage(ctx, api, packageName)
}
26 changes: 26 additions & 0 deletions internal/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,32 @@ func (client *Client) CheckHealth(ctx context.Context) error {
return nil
}

// IsFailureStoreAvailable checks if the failure store is available.
func (client *Client) IsFailureStoreAvailable(ctx context.Context) (bool, error) {
// FIXME: Using the low-level transport till the API SDK supports the failure store.
request, err := http.NewRequest(http.MethodGet, "/_search?failure_store=only", nil)
if err != nil {
return false, fmt.Errorf("failed to create search request: %w", err)
}
request.Header.Set("Content-Type", "application/json")

resp, err := client.Transport.Perform(request)
if err != nil {
return false, fmt.Errorf("failed to perform search request: %w", err)
}
defer resp.Body.Close()

switch resp.StatusCode {
case http.StatusOK:
return true, nil
case http.StatusBadRequest:
// Error expected when using an unrecognized parameter.
return false, nil
default:
return false, fmt.Errorf("unexpected status code received: %d", resp.StatusCode)
}
}

// redHealthCause tries to identify the cause of a cluster in red state. This could be
// also used as a replacement of CheckHealth, but keeping them separated because it uses
// internal undocumented APIs that might change.
Expand Down
79 changes: 79 additions & 0 deletions internal/elasticsearch/ingest/failurestorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package ingest

import (
"bytes"
"context"
"encoding/json"
"fmt"

"github.com/elastic/elastic-package/internal/elasticsearch"
)

func EnableFailureStore(ctx context.Context, api *elasticsearch.API, indexTemplateName string, enabled bool) error {
resp, err := api.Indices.GetIndexTemplate(
api.Indices.GetIndexTemplate.WithContext(ctx),
api.Indices.GetIndexTemplate.WithName(indexTemplateName),
)
if err != nil {
return fmt.Errorf("failed to get index template %s: %w", indexTemplateName, err)
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("failed to get index template %s: %s", indexTemplateName, resp.String())
}

var templateResponse struct {
IndexTemplates []struct {
IndexTemplate map[string]any `json:"index_template"`
} `json:"index_templates"`
}
err = json.NewDecoder(resp.Body).Decode(&templateResponse)
if err != nil {
return fmt.Errorf("failed to decode response while getting index template %s: %w", indexTemplateName, err)
}
if n := len(templateResponse.IndexTemplates); n != 1 {
return fmt.Errorf("unexpected number of index templates obtained while getting %s, expected 1, found %d", indexTemplateName, err)
}

template := templateResponse.IndexTemplates[0].IndexTemplate
dsSettings, found := template["data_stream"]
if found {
dsMap, ok := dsSettings.(map[string]any)
if !ok {
return fmt.Errorf("unexpected type for data stream settings in index template %s, expected map, found %T", indexTemplateName, dsMap)
}
if current, found := dsMap["failure_store"].(bool); found && current == enabled {
// Nothing to do, it already has the expected value.
return nil
}
dsMap["failure_store"] = enabled
template["data_stream"] = dsMap
} else {
template["data_stream"] = map[string]any{
"failure_store": enabled,
}
}

d, err := json.Marshal(template)
if err != nil {
return fmt.Errorf("failed to marshal template after updating it: %w", err)
}

updateResp, err := api.Indices.PutIndexTemplate(indexTemplateName, bytes.NewReader(d),
api.Indices.PutIndexTemplate.WithContext(ctx),
api.Indices.PutIndexTemplate.WithCreate(false),
)
if err != nil {
return fmt.Errorf("failed to update index template %s: %w", indexTemplateName, err)
}
defer updateResp.Body.Close()
if updateResp.IsError() {
return fmt.Errorf("failed to update index template %s: %s", indexTemplateName, resp.String())
}

return nil
}
88 changes: 88 additions & 0 deletions internal/elasticsearch/ingest/failurestorage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package ingest

import (
"bytes"
"context"
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-package/internal/elasticsearch"
estest "github.com/elastic/elastic-package/internal/elasticsearch/test"
)

func TestEnableFailureStore(t *testing.T) {
client := estest.NewClient(t, "testdata/elasticsearch-8-enable-failure-store")

templateName := "ep-test-index-template"
templateBody := []byte(`{"index_patterns": ["metrics-eptest.failurestore-*"],"data_stream": {}}`)
createTempIndexTemplate(t, client.API, templateName, templateBody)
assertFailureStore(t, client.API, templateName, false)

err := EnableFailureStore(context.Background(), client.API, templateName, true)
assert.NoError(t, err)
assertFailureStore(t, client.API, templateName, true)

err = EnableFailureStore(context.Background(), client.API, templateName, false)
assert.NoError(t, err)
assertFailureStore(t, client.API, templateName, false)
}

func TestEnableFailureStoreNothingToDo(t *testing.T) {
client := estest.NewClient(t, "testdata/elasticsearch-8-enable-failure-store-noop")

templateName := "ep-test-index-template"
templateBody := []byte(`{"index_patterns": ["metrics-eptest.failurestore-*"],"data_stream": {"failure_store":true}}`)
createTempIndexTemplate(t, client.API, templateName, templateBody)
assertFailureStore(t, client.API, templateName, true)

err := EnableFailureStore(context.Background(), client.API, templateName, true)
assert.NoError(t, err)
assertFailureStore(t, client.API, templateName, true)
}

func createTempIndexTemplate(t *testing.T, api *elasticsearch.API, name string, body []byte) {
createResp, err := api.Indices.PutIndexTemplate(name, bytes.NewReader(body),
api.Indices.PutIndexTemplate.WithCreate(true),
)
require.NoError(t, err)
require.False(t, createResp.IsError(), createResp.String())
t.Cleanup(func() {
deleteResp, err := api.Indices.DeleteIndexTemplate(name)
require.NoError(t, err)
require.False(t, deleteResp.IsError())
})
}

func assertFailureStore(t *testing.T, api *elasticsearch.API, name string, expected bool) {
resp, err := api.Indices.GetIndexTemplate(
api.Indices.GetIndexTemplate.WithName(name),
)
require.NoError(t, err)
require.False(t, resp.IsError())
defer resp.Body.Close()

var templateResponse struct {
IndexTemplates []struct {
IndexTemplate struct {
DataStream struct {
FailureStore *bool `json:"failure_store"`
} `json:"data_stream"`
} `json:"index_template"`
} `json:"index_templates"`
}
err = json.NewDecoder(resp.Body).Decode(&templateResponse)
require.NoError(t, err)
require.Len(t, templateResponse.IndexTemplates, 1)
found := templateResponse.IndexTemplates[0].IndexTemplate.DataStream.FailureStore

if assert.NotNil(t, found) {
assert.Equal(t, expected, *found)
}
}
Loading