Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add helper to enable or disable failure store in index template
  • Loading branch information
jsoriano committed Jul 12, 2024
commit dfbb9511b9df1cd0711fd8695e438a6f9e91e510
75 changes: 75 additions & 0 deletions internal/elasticsearch/ingest/failurestorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package ingest

import (
"bytes"
"context"
"encoding/json"
"fmt"

"github.com/elastic/elastic-package/internal/elasticsearch"
)

func EnableFailureStore(ctx context.Context, api *elasticsearch.API, indexTemplateName string, enabled bool) error {
resp, err := api.Indices.GetIndexTemplate(
api.Indices.GetIndexTemplate.WithContext(ctx),
api.Indices.GetIndexTemplate.WithName(indexTemplateName),
)
if err != nil {
return fmt.Errorf("failed to get index template %s: %w", indexTemplateName, err)
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("failed to get index template %s: %s", indexTemplateName, resp.String())
}

var templateResponse struct {
IndexTemplates []struct {
IndexTemplate map[string]any `json:"index_template"`
} `json:"index_templates"`
}
err = json.NewDecoder(resp.Body).Decode(&templateResponse)
if err != nil {
return fmt.Errorf("failed to decode response while getting index template %s: %w", indexTemplateName, err)
}
if n := len(templateResponse.IndexTemplates); n != 1 {
return fmt.Errorf("unexpected number of index templates obtained while getting %s, expected 1, found %d", indexTemplateName, err)
}

template := templateResponse.IndexTemplates[0].IndexTemplate
dsSettings, found := template["data_stream"]
if found {
dsMap, ok := dsSettings.(map[string]any)
if !ok {
return fmt.Errorf("unexpected type for data stream settings in index template %s, expected map, found %T", indexTemplateName, dsMap)
}
dsMap["failure_store"] = enabled
template["data_stream"] = dsMap
} else {
template["data_stream"] = map[string]any{
"failure_store": enabled,
}
}

d, err := json.Marshal(template)
if err != nil {
return fmt.Errorf("failed to marshal template after updating it: %w", err)
}

updateResp, err := api.Indices.PutIndexTemplate(indexTemplateName, bytes.NewReader(d),
api.Indices.PutIndexTemplate.WithContext(ctx),
api.Indices.PutIndexTemplate.WithCreate(false),
)
if err != nil {
return fmt.Errorf("failed to update index template %s: %w", indexTemplateName, err)
}
defer updateResp.Body.Close()
if updateResp.IsError() {
return fmt.Errorf("failed to update index template %s: %s", indexTemplateName, resp.String())
}

return nil
}
75 changes: 75 additions & 0 deletions internal/elasticsearch/ingest/failurestorage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package ingest

import (
"bytes"
"context"
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-package/internal/elasticsearch"
estest "github.com/elastic/elastic-package/internal/elasticsearch/test"
)

func TestEnableFailureStore(t *testing.T) {
client := estest.NewClient(t, "testdata/elasticsearch-9-enable-failure-store")

templateName := "ep-test-index-template"
templateBody := []byte(`{"index_patterns": ["metrics-eptest.failurestore-*"],"data_stream": {}}`)
createTempIndexTemplate(t, client.API, templateName, templateBody)
assertFailureStore(t, client.API, templateName, false)

err := EnableFailureStore(context.Background(), client.API, templateName, true)
assert.NoError(t, err)
assertFailureStore(t, client.API, templateName, true)

err = EnableFailureStore(context.Background(), client.API, templateName, false)
assert.NoError(t, err)
assertFailureStore(t, client.API, templateName, false)
}

func createTempIndexTemplate(t *testing.T, api *elasticsearch.API, name string, body []byte) {
createResp, err := api.Indices.PutIndexTemplate(name, bytes.NewReader(body),
api.Indices.PutIndexTemplate.WithCreate(true),
)
require.NoError(t, err)
require.False(t, createResp.IsError(), createResp.String())
t.Cleanup(func() {
deleteResp, err := api.Indices.DeleteIndexTemplate(name)
require.NoError(t, err)
require.False(t, deleteResp.IsError())
})
}

func assertFailureStore(t *testing.T, api *elasticsearch.API, name string, expected bool) {
resp, err := api.Indices.GetIndexTemplate(
api.Indices.GetIndexTemplate.WithName(name),
)
require.NoError(t, err)
require.False(t, resp.IsError())
defer resp.Body.Close()

var templateResponse struct {
IndexTemplates []struct {
IndexTemplate struct {
DataStream struct {
FailureStore *bool `json:"failure_store"`
} `json:"data_stream"`
} `json:"index_template"`
} `json:"index_templates"`
}
err = json.NewDecoder(resp.Body).Decode(&templateResponse)
require.NoError(t, err)
require.Len(t, templateResponse.IndexTemplates, 1)
found := templateResponse.IndexTemplates[0].IndexTemplate.DataStream.FailureStore

if assert.NotNil(t, found) {
assert.Equal(t, expected, *found)
}
}
Loading