diff --git a/internal/elasticsearch/client.go b/internal/elasticsearch/client.go index 748edf0eb0..cecda638a2 100644 --- a/internal/elasticsearch/client.go +++ b/internal/elasticsearch/client.go @@ -189,6 +189,37 @@ func (client *Client) CheckHealth(ctx context.Context) error { return nil } +type Info struct { + Name string `json:"name"` + ClusterName string `json:"cluster_name"` + ClusterUUID string `json:"cluster_uuid"` + Version struct { + Number string `json:"number"` + BuildFlavor string `json:"build_flavor"` + } `json:"version` +} + +// Info gets cluster information and metadata. +func (client *Client) Info(ctx context.Context) (*Info, error) { + resp, err := client.Client.Info(client.Client.Info.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("error getting cluster info: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to get cluster info: %s", resp.String()) + } + + var info Info + err = json.NewDecoder(resp.Body).Decode(&info) + if err != nil { + return nil, fmt.Errorf("error decoding cluster info: %w", err) + } + + return &info, nil +} + // IsFailureStoreAvailable checks if the failure store is available. func (client *Client) IsFailureStoreAvailable(ctx context.Context) (bool, error) { // FIXME: Using the low-level transport till the API SDK supports the failure store. diff --git a/internal/elasticsearch/client_test.go b/internal/elasticsearch/client_test.go index a73e2c4489..40ccdcc135 100644 --- a/internal/elasticsearch/client_test.go +++ b/internal/elasticsearch/client_test.go @@ -93,6 +93,13 @@ func TestClusterHealth(t *testing.T) { } } +func TestClusterInfo(t *testing.T) { + client := test.NewClient(t, "./testdata/elasticsearch-9-info") + info, err := client.Info(context.Background()) + require.NoError(t, err) + assert.Equal(t, "9.0.0-SNAPSHOT", info.Version.Number) +} + func writeCACertFile(t *testing.T, cert *x509.Certificate) string { var d bytes.Buffer err := pem.Encode(&d, &pem.Block{ diff --git a/internal/elasticsearch/testdata/elasticsearch-9-info.yaml b/internal/elasticsearch/testdata/elasticsearch-9-info.yaml new file mode 100644 index 0000000000..05a5f1dab8 --- /dev/null +++ b/internal/elasticsearch/testdata/elasticsearch-9-info.yaml @@ -0,0 +1,119 @@ +--- +version: 2 +interactions: + - id: 0 + request: + proto: HTTP/1.1 + proto_major: 1 + proto_minor: 1 + content_length: 0 + transfer_encoding: [] + trailer: {} + host: "" + remote_addr: "" + request_uri: "" + body: "" + form: {} + headers: + Authorization: + - Basic ZWxhc3RpYzpjaGFuZ2VtZQ== + User-Agent: + - go-elasticsearch/7.17.10 (linux amd64; Go 1.23.4) + X-Elastic-Client-Meta: + - es=7.17.10,go=1.23.4,t=7.17.10,hc=1.23.4 + url: https://127.0.0.1:9200/ + method: GET + response: + proto: HTTP/1.1 + proto_major: 1 + proto_minor: 1 + transfer_encoding: [] + trailer: {} + content_length: 547 + uncompressed: false + body: | + { + "name" : "da1c4d2b1379", + "cluster_name" : "elasticsearch", + "cluster_uuid" : "KPcvXrW1Rnega2GT0sn3mg", + "version" : { + "number" : "9.0.0-SNAPSHOT", + "build_flavor" : "default", + "build_type" : "docker", + "build_hash" : "873dc52360a9265824a70b3113c3dd350ff9249a", + "build_date" : "2025-01-13T13:20:37.908330789Z", + "build_snapshot" : true, + "lucene_version" : "10.0.0", + "minimum_wire_compatibility_version" : "8.18.0", + "minimum_index_compatibility_version" : "8.0.0" + }, + "tagline" : "You Know, for Search" + } + headers: + Content-Length: + - "547" + Content-Type: + - application/json + X-Elastic-Product: + - Elasticsearch + status: 200 OK + code: 200 + duration: 3.494911ms + - id: 1 + request: + proto: HTTP/1.1 + proto_major: 1 + proto_minor: 1 + content_length: 0 + transfer_encoding: [] + trailer: {} + host: "" + remote_addr: "" + request_uri: "" + body: "" + form: {} + headers: + Authorization: + - Basic ZWxhc3RpYzpjaGFuZ2VtZQ== + User-Agent: + - go-elasticsearch/7.17.10 (linux amd64; Go 1.23.4) + X-Elastic-Client-Meta: + - es=7.17.10,go=1.23.4,t=7.17.10,hc=1.23.4 + url: https://127.0.0.1:9200/ + method: GET + response: + proto: HTTP/1.1 + proto_major: 1 + proto_minor: 1 + transfer_encoding: [] + trailer: {} + content_length: 547 + uncompressed: false + body: | + { + "name" : "da1c4d2b1379", + "cluster_name" : "elasticsearch", + "cluster_uuid" : "KPcvXrW1Rnega2GT0sn3mg", + "version" : { + "number" : "9.0.0-SNAPSHOT", + "build_flavor" : "default", + "build_type" : "docker", + "build_hash" : "873dc52360a9265824a70b3113c3dd350ff9249a", + "build_date" : "2025-01-13T13:20:37.908330789Z", + "build_snapshot" : true, + "lucene_version" : "10.0.0", + "minimum_wire_compatibility_version" : "8.18.0", + "minimum_index_compatibility_version" : "8.0.0" + }, + "tagline" : "You Know, for Search" + } + headers: + Content-Length: + - "547" + Content-Type: + - application/json + X-Elastic-Product: + - Elasticsearch + status: 200 OK + code: 200 + duration: 474.3µs diff --git a/internal/fleetserver/client.go b/internal/fleetserver/client.go new file mode 100644 index 0000000000..98dfd7f7b3 --- /dev/null +++ b/internal/fleetserver/client.go @@ -0,0 +1,128 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleetserver + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "net/url" + + "github.com/elastic/elastic-package/internal/certs" + "github.com/elastic/elastic-package/internal/logger" +) + +// Client is a client for Fleet Server API. This API only supports authentication with API +// keys, though some endpoints are also available without any authentication. +type Client struct { + address string + apiKey string + + certificateAuthority string + tlSkipVerify bool + + http *http.Client + httpClientSetup func(*http.Client) *http.Client +} + +type ClientOption func(*Client) + +func NewClient(address string, opts ...ClientOption) (*Client, error) { + client := Client{ + address: address, + } + + for _, opt := range opts { + opt(&client) + } + + httpClient, err := client.httpClient() + if err != nil { + return nil, fmt.Errorf("cannot create HTTP client: %w", err) + } + client.http = httpClient + return &client, nil +} + +// APIKey option sets the API key to be used by the client for authentication. +func APIKey(apiKey string) ClientOption { + return func(c *Client) { + c.apiKey = apiKey + } +} + +// TLSSkipVerify option disables TLS verification. +func TLSSkipVerify() ClientOption { + return func(c *Client) { + c.tlSkipVerify = true + } +} + +// CertificateAuthority sets the certificate authority to be used by the client. +func CertificateAuthority(certificateAuthority string) ClientOption { + return func(c *Client) { + c.certificateAuthority = certificateAuthority + } +} + +// HTTPClientSetup adds an initializing function for the http client. +func HTTPClientSetup(setup func(*http.Client) *http.Client) ClientOption { + return func(c *Client) { + c.httpClientSetup = setup + } +} + +func (c *Client) httpClient() (*http.Client, error) { + client := &http.Client{} + if c.tlSkipVerify { + client.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } else if c.certificateAuthority != "" { + rootCAs, err := certs.SystemPoolWithCACertificate(c.certificateAuthority) + if err != nil { + return nil, fmt.Errorf("reading CA certificate: %w", err) + } + client.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{RootCAs: rootCAs}, + } + } + + if c.httpClientSetup != nil { + client = c.httpClientSetup(client) + } + + return client, nil +} + +func (c *Client) httpRequest(ctx context.Context, method, resourcePath string, reqBody io.Reader) (*http.Request, error) { + base, err := url.Parse(c.address) + if err != nil { + return nil, fmt.Errorf("could not create base URL from host: %v: %w", c.address, err) + } + + rel, err := url.Parse(resourcePath) + if err != nil { + return nil, fmt.Errorf("could not create relative URL from resource path: %v: %w", resourcePath, err) + } + + u := base.JoinPath(rel.EscapedPath()) + u.RawQuery = rel.RawQuery + + logger.Debugf("%s %s", method, u) + + req, err := http.NewRequestWithContext(ctx, method, u.String(), reqBody) + if err != nil { + return nil, fmt.Errorf("could not create %v request to Fleet Server API resource: %s: %w", method, resourcePath, err) + } + + if c.apiKey != "" { + req.Header.Set("Authorization", "ApiKey "+c.apiKey) + } + + return req, nil +} diff --git a/internal/fleetserver/status.go b/internal/fleetserver/status.go new file mode 100644 index 0000000000..59a92d38c1 --- /dev/null +++ b/internal/fleetserver/status.go @@ -0,0 +1,56 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleetserver + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/url" + + "github.com/elastic/elastic-package/internal/logger" +) + +type Status struct { + Name string `json:"name"` + Status string `json:"status"` + + // Version is only present if client is authenticated. + Version struct { + Number string `json:"number"` + } `json:"version"` +} + +func (c *Client) Status(ctx context.Context) (*Status, error) { + statusURL, err := url.JoinPath(c.address, "/api/status") + if err != nil { + return nil, fmt.Errorf("could not build URL: %w", err) + } + logger.Debugf("GET %s", statusURL) + req, err := c.httpRequest(ctx, "GET", statusURL, nil) + if err != nil { + return nil, err + } + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed (url: %s): %w", statusURL, err) + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return nil, fmt.Errorf("unexpected status code %v", resp.StatusCode) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + var status Status + err = json.Unmarshal(body, &status) + if err != nil { + return nil, fmt.Errorf("failed to parse response body: %w", err) + } + + return &status, nil +} diff --git a/internal/fleetserver/status_test.go b/internal/fleetserver/status_test.go new file mode 100644 index 0000000000..278d99e3a3 --- /dev/null +++ b/internal/fleetserver/status_test.go @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleetserver_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-package/internal/fleetserver" + fleetservertest "github.com/elastic/elastic-package/internal/fleetserver/test" +) + +func TestStatusAPIKeyAuthenticated(t *testing.T) { + client := fleetservertest.NewClient(t, + "./testdata/status-authenticated", + "https://localhost:8220", + fleetserver.APIKey("V2R2NVlKUUJtbFRXZFZHaTB5c1U6aFpqdU1udWpUajZBR1FPUUNRRGdWZw=="), + fleetserver.TLSSkipVerify(), + ) + + status, err := client.Status(context.Background()) + require.NoError(t, err) + + assert.Equal(t, status.Name, "fleet-server") + assert.Equal(t, status.Status, "HEALTHY") + assert.NotEmpty(t, status.Version.Number) +} + +func TestStatusUnauthenticated(t *testing.T) { + client := fleetservertest.NewClient(t, + "./testdata/status-unauthenticated", + "https://localhost:8220", + fleetserver.TLSSkipVerify(), + ) + + status, err := client.Status(context.Background()) + require.NoError(t, err) + + assert.Equal(t, status.Name, "fleet-server") + assert.Equal(t, status.Status, "HEALTHY") +} diff --git a/internal/fleetserver/test/httptest.go b/internal/fleetserver/test/httptest.go new file mode 100644 index 0000000000..d265012573 --- /dev/null +++ b/internal/fleetserver/test/httptest.go @@ -0,0 +1,51 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package test + +import ( + "net/http" + "os" + "testing" + + "github.com/stretchr/testify/require" + "gopkg.in/dnaeon/go-vcr.v3/cassette" + "gopkg.in/dnaeon/go-vcr.v3/recorder" + + "github.com/elastic/elastic-package/internal/fleetserver" +) + +// NewClient returns a client for a testing http server that uses prerecorded +// responses. If responses are not found, it forwards the query to the server started by +// elastic-package stack, and records the response. +// Responses are recorded in the directory indicated by serverDataDir. +func NewClient(t *testing.T, recordFileName string, host string, options ...fleetserver.ClientOption) *fleetserver.Client { + setupHTTPClient := func(client *http.Client) *http.Client { + rec, err := recorder.NewWithOptions(&recorder.Options{ + CassetteName: recordFileName, + Mode: recorder.ModeRecordOnce, + SkipRequestLatency: true, + RealTransport: client.Transport, + }) + require.NoError(t, err) + t.Cleanup(func() { + err := rec.Stop() + require.NoError(t, err) + }) + return rec.GetDefaultClient() + } + + _, err := os.Stat(cassette.New(recordFileName).File) + if err == nil { + host = "https://localhost:8220" + options = nil + } + + options = append(options, fleetserver.HTTPClientSetup(setupHTTPClient)) + + client, err := fleetserver.NewClient(host, options...) + require.NoError(t, err) + + return client +} diff --git a/internal/fleetserver/testdata/status-authenticated.yaml b/internal/fleetserver/testdata/status-authenticated.yaml new file mode 100644 index 0000000000..1907c7bb51 --- /dev/null +++ b/internal/fleetserver/testdata/status-authenticated.yaml @@ -0,0 +1,44 @@ +--- +version: 2 +interactions: + - id: 0 + request: + proto: HTTP/1.1 + proto_major: 1 + proto_minor: 1 + content_length: 0 + transfer_encoding: [] + trailer: {} + host: localhost:8220 + remote_addr: "" + request_uri: "" + body: "" + form: {} + headers: + Authorization: + - ApiKey V2R2NVlKUUJtbFRXZFZHaTB5c1U6aFpqdU1udWpUajZBR1FPUUNRRGdWZw== + url: https://localhost:8220/api/status + method: GET + response: + proto: HTTP/1.1 + proto_major: 1 + proto_minor: 1 + transfer_encoding: [] + trailer: {} + content_length: 140 + uncompressed: false + body: '{"name":"fleet-server","status":"HEALTHY","version":{"build_hash":"51a602de","build_time":"2025-01-10T18:41:11Z","number":"9.0.0-SNAPSHOT"}}' + headers: + Content-Length: + - "140" + Content-Type: + - application/json + Date: + - Mon, 13 Jan 2025 18:43:34 GMT + Elastic-Api-Version: + - "2023-06-01" + X-Request-Id: + - 05fcfd8c-470e-4178-91fc-e989f80c3b4c + status: 200 OK + code: 200 + duration: 1.763197ms diff --git a/internal/fleetserver/testdata/status-unauthenticated.yaml b/internal/fleetserver/testdata/status-unauthenticated.yaml new file mode 100644 index 0000000000..3efcc8bf3a --- /dev/null +++ b/internal/fleetserver/testdata/status-unauthenticated.yaml @@ -0,0 +1,44 @@ +--- +version: 2 +interactions: + - id: 0 + request: + proto: HTTP/1.1 + proto_major: 1 + proto_minor: 1 + content_length: 0 + transfer_encoding: [] + trailer: {} + host: localhost:8220 + remote_addr: "" + request_uri: "" + body: "" + form: {} + headers: + Authorization: + - Basic Og== + url: https://localhost:8220/api/status + method: GET + response: + proto: HTTP/1.1 + proto_major: 1 + proto_minor: 1 + transfer_encoding: [] + trailer: {} + content_length: 42 + uncompressed: false + body: '{"name":"fleet-server","status":"HEALTHY"}' + headers: + Content-Length: + - "42" + Content-Type: + - application/json + Date: + - Mon, 13 Jan 2025 18:43:34 GMT + Elastic-Api-Version: + - "2023-06-01" + X-Request-Id: + - 363c794e-7115-4648-8129-b3bf85dbf8bc + status: 200 OK + code: 200 + duration: 1.31018ms diff --git a/internal/kibana/agents.go b/internal/kibana/agents.go index 853a2520ee..cd56df6359 100644 --- a/internal/kibana/agents.go +++ b/internal/kibana/agents.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "net/http" + "net/url" "time" "github.com/elastic/elastic-package/internal/logger" @@ -49,7 +50,18 @@ func (a *Agent) String() string { // ListAgents returns the list of agents enrolled with Fleet. func (c *Client) ListAgents(ctx context.Context) ([]Agent, error) { - statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agents", FleetAPI)) + return c.QueryAgents(ctx, "") +} + +// QueryAgents returns the list of agents enrolled with Fleet that satisfy a kibana query. +func (c *Client) QueryAgents(ctx context.Context, kuery string) ([]Agent, error) { + resource := fmt.Sprintf("%s/agents", FleetAPI) + if kuery != "" { + values := make(url.Values) + values.Set("kuery", kuery) + resource += "?" + values.Encode() + } + statusCode, respBody, err := c.get(ctx, resource) if err != nil { return nil, fmt.Errorf("could not list agents: %w", err) } @@ -68,7 +80,7 @@ func (c *Client) ListAgents(ctx context.Context) ([]Agent, error) { } switch { - case c.semver.Major() < 9: + case c.semver != nil && c.semver.Major() < 9: return resp.List, nil default: return resp.Items, nil @@ -84,7 +96,7 @@ func (c *Client) AssignPolicyToAgent(ctx context.Context, a Agent, p Policy) err var err error var respBody []byte switch { - case c.semver.Major() < 9: + case c.semver != nil && c.semver.Major() < 9: statusCode, respBody, err = c.put(ctx, path, []byte(reqBody)) default: statusCode, respBody, err = c.post(ctx, path, []byte(reqBody)) diff --git a/internal/kibana/client.go b/internal/kibana/client.go index 74e7dd9bff..02ef00cda7 100644 --- a/internal/kibana/client.go +++ b/internal/kibana/client.go @@ -22,7 +22,10 @@ import ( "github.com/elastic/elastic-package/internal/retry" ) -var ErrUndefinedHost = errors.New("missing kibana host") +var ( + ErrUndefinedHost = errors.New("missing kibana host") + ErrConflict = errors.New("resource already exists") +) // Client is responsible for exporting dashboards from Kibana. type Client struct { @@ -74,9 +77,12 @@ func NewClient(opts ...ClientOption) (*Client, error) { } c.versionInfo = v.Version - c.semver, err = semver.NewVersion(c.versionInfo.Number) - if err != nil { - return nil, fmt.Errorf("failed to parse Kibana version (%s): %w", c.versionInfo.Number, err) + // Version info may not contain any version if this is a managed Kibana. + if c.versionInfo.Number != "" { + c.semver, err = semver.NewVersion(c.versionInfo.Number) + if err != nil { + return nil, fmt.Errorf("failed to parse Kibana version (%s): %w", c.versionInfo.Number, err) + } } } diff --git a/internal/kibana/enrollmenttokens.go b/internal/kibana/enrollmenttokens.go new file mode 100644 index 0000000000..558c54fe32 --- /dev/null +++ b/internal/kibana/enrollmenttokens.go @@ -0,0 +1,110 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kibana + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strconv" +) + +type EnrollmentToken struct { + Active bool `json:"active"` + APIKey string `json:"api_key"` + ID string `json:"id"` + Name string `json:"name"` + PolicyID string `json:"policy_id"` +} + +// GetEnrollmentTokenForPolicyID returns an active enrollment token for a given policy ID. +// It obtains the token by returning one of the list of active tokens for this policy, or +// requesting one if there are none. +func (c *Client) GetEnrollmentTokenForPolicyID(ctx context.Context, policyID string) (string, error) { + kuery := fmt.Sprintf("active:true and policy_id:%s", policyID) + tokens, err := c.getEnrollmentTokens(ctx, kuery) + if err != nil { + return "", err + } + if len(tokens) == 0 { + token, err := c.requestEnrollmentToken(ctx, policyID) + if err != nil { + return "", fmt.Errorf("no active enrollment token found for policy %s and failed to request one: %w", policyID, err) + } + if !token.Active { + return "", fmt.Errorf("requested token %s is not active, this should not happen", token.ID) + } + return token.APIKey, nil + } + + // API sorts tokens by creation date in descending order, so the first one is + // the newest, return it. + return tokens[0].APIKey, nil +} + +func (c *Client) getEnrollmentTokens(ctx context.Context, kuery string) ([]EnrollmentToken, error) { + var tokens []EnrollmentToken + var resp struct { + List []EnrollmentToken `json:"list"` + Items []EnrollmentToken `json:"items"` + Total int `json:"total"` + Page int `json:"page"` + PerPage int `json:"perPage` + } + for { + values := make(url.Values) + values.Set("page", strconv.Itoa(resp.Page+1)) + values.Set("kuery", kuery) + resource := fmt.Sprintf("%s/enrollment_api_keys?%s", FleetAPI, values.Encode()) + statusCode, respBody, err := c.get(ctx, resource) + if err != nil { + return nil, fmt.Errorf("could not get enrollment tokens (query: %q): %w", values.Encode(), err) + } + if statusCode != http.StatusOK { + return nil, fmt.Errorf("could not get enrollment tokens (query: %q; API status code = %d; response body = %s", values.Encode(), statusCode, respBody) + } + + if err := json.Unmarshal(respBody, &resp); err != nil { + return nil, fmt.Errorf("could not decode response to get enrollment tokens: %w", err) + } + + // Tokens are listed twice, at least on some versions, get only one copy of them. + if len(resp.List) > 0 { + tokens = append(tokens, resp.List...) + } else if len(resp.Items) > 0 { + tokens = append(tokens, resp.Items...) + } + + if resp.Page*resp.PerPage >= resp.Total { + break + } + } + + return tokens, nil +} + +func (c *Client) requestEnrollmentToken(ctx context.Context, policyID string) (*EnrollmentToken, error) { + reqBody := fmt.Sprintf(`{"policy_id":"%s"}`, policyID) + resource := fmt.Sprintf("%s/enrollment_api_keys", FleetAPI) + statusCode, respBody, err := c.post(ctx, resource, []byte(reqBody)) + if err != nil { + return nil, fmt.Errorf("could not request enrollment token: %w", err) + } + if statusCode != http.StatusOK { + return nil, fmt.Errorf("could not request enrollment token (API status code = %d; response body = %s", statusCode, respBody) + } + + var resp struct { + Item EnrollmentToken `json:"item"` + Action string `json:"action"` + } + if err := json.Unmarshal(respBody, &resp); err != nil { + return nil, fmt.Errorf("could not decode response to request for enrollment token: %w", err) + } + + return &resp.Item, nil +} diff --git a/internal/kibana/fleet.go b/internal/kibana/fleet.go index 5b81e52dfa..dda08b4554 100644 --- a/internal/kibana/fleet.go +++ b/internal/kibana/fleet.go @@ -21,12 +21,23 @@ type FleetOutput struct { SSL *AgentSSL `json:"ssl,omitempty"` } +type FleetServerHost struct { + ID string `json:"id,omitempty"` + URLs []string `json:"host_urls"` + Name string `json:"name"` + + // TODO: Avoid using is_default, so a cluster can be used for multiple environments. + IsDefault bool `json:"is_default"` +} + type AgentSSL struct { CertificateAuthorities []string `json:"certificate_authorities,omitempty"` Certificate string `json:"certificate,omitempty"` Key string `json:"key,omitempty"` } +var ErrFleetServerNotFound = errors.New("could not find a fleet server URL") + // DefaultFleetServerURL returns the default Fleet server configured in Kibana func (c *Client) DefaultFleetServerURL(ctx context.Context) (string, error) { path := fmt.Sprintf("%s/fleet_server_hosts", FleetAPI) @@ -57,7 +68,7 @@ func (c *Client) DefaultFleetServerURL(ctx context.Context) (string, error) { } } - return "", errors.New("could not find the fleet server URL for this project") + return "", ErrFleetServerNotFound } // UpdateFleetOutput updates an existing output to fleet @@ -92,6 +103,9 @@ func (c *Client) AddFleetOutput(ctx context.Context, fo FleetOutput) error { return fmt.Errorf("could not create fleet output: %w", err) } + if statusCode == http.StatusConflict { + return fmt.Errorf("could not add fleet output: %w", ErrConflict) + } if statusCode != http.StatusOK { return fmt.Errorf("could not add fleet output; API status code = %d; response body = %s", statusCode, respBody) } @@ -99,6 +113,24 @@ func (c *Client) AddFleetOutput(ctx context.Context, fo FleetOutput) error { return nil } +// RemoveFleetOutput removes an output from Fleet +func (c *Client) RemoveFleetOutput(ctx context.Context, outputID string) error { + statusCode, respBody, err := c.delete(ctx, fmt.Sprintf("%s/outputs/%s", FleetAPI, outputID)) + if err != nil { + return fmt.Errorf("could not delete fleet output: %w", err) + } + + if statusCode == http.StatusNotFound { + // Already removed, ignore error. + return nil + } + if statusCode != http.StatusOK { + return fmt.Errorf("could not remove fleet output; API status code = %d; response body = %s", statusCode, respBody) + } + + return nil +} + func (c *Client) SetAgentLogLevel(ctx context.Context, agentID, level string) error { path := fmt.Sprintf("%s/agents/%s/actions", FleetAPI, agentID) @@ -147,3 +179,71 @@ func (c *Client) SetAgentLogLevel(ctx context.Context, agentID, level string) er } return nil } + +func (c *Client) AddFleetServerHost(ctx context.Context, host FleetServerHost) error { + reqBody, err := json.Marshal(host) + if err != nil { + return fmt.Errorf("could not convert fleet server host to JSON: %w", err) + } + + statusCode, respBody, err := c.post(ctx, fmt.Sprintf("%s/fleet_server_hosts", FleetAPI), reqBody) + if err != nil { + return fmt.Errorf("could not add fleet server host: %w", err) + } + + if statusCode == http.StatusConflict { + return fmt.Errorf("could not add fleet server host: %w", ErrConflict) + } + if statusCode != http.StatusOK { + return fmt.Errorf("could not add fleet server host; API status code = %d; response body = %s", statusCode, respBody) + } + + return nil +} + +func (c *Client) UpdateFleetServerHost(ctx context.Context, host FleetServerHost) error { + if host.ID == "" { + return fmt.Errorf("host id required when updating fleet server host") + } + + // Payload should not contain the ID, it is set in the URL. + id := host.ID + host.ID = "" + reqBody, err := json.Marshal(host) + if err != nil { + return fmt.Errorf("could not convert fleet server host to JSON: %w", err) + } + + statusCode, respBody, err := c.put(ctx, fmt.Sprintf("%s/fleet_server_hosts/%s", FleetAPI, id), reqBody) + if err != nil { + return fmt.Errorf("could not update fleet server host: %w", err) + } + + if statusCode != http.StatusOK { + return fmt.Errorf("could not update fleet server host; API status code = %d; response body = %s", statusCode, respBody) + } + + return nil +} + +// CreateFleetServiceToken creates a service token for Fleet, to be used when enrolling Fleet Servers. +func (c *Client) CreateFleetServiceToken(ctx context.Context) (string, error) { + statusCode, respBody, err := c.post(ctx, fmt.Sprintf("%s/service_tokens", FleetAPI), nil) + if err != nil { + return "", fmt.Errorf("could not request fleet service token: %w", err) + } + + if statusCode != http.StatusOK { + return "", fmt.Errorf("could not request fleet service token; API status code = %d; response body = %s", statusCode, respBody) + } + + var resp struct { + Name string `json:"name"` + Value string `json:"value"` + } + if err := json.Unmarshal(respBody, &resp); err != nil { + return "", fmt.Errorf("could not convert actions agent (response) to JSON: %w", err) + } + + return resp.Value, nil +} diff --git a/internal/kibana/packages.go b/internal/kibana/packages.go index 40b40434e1..34fcf7a079 100644 --- a/internal/kibana/packages.go +++ b/internal/kibana/packages.go @@ -142,7 +142,7 @@ func (c *Client) epmPackageUrl(name, version string) string { return fmt.Sprintf("%s/epm/packages/%s", FleetAPI, name) } switch { - case c.semver.Major() < 8: + case c.semver != nil && c.semver.Major() < 8: return fmt.Sprintf("%s/epm/packages/%s-%s", FleetAPI, name, version) default: return fmt.Sprintf("%s/epm/packages/%s/%s", FleetAPI, name, version) diff --git a/internal/kibana/policies.go b/internal/kibana/policies.go index f963833d9a..40209bdfbe 100644 --- a/internal/kibana/policies.go +++ b/internal/kibana/policies.go @@ -15,14 +15,15 @@ import ( // Policy represents an Agent Policy in Fleet. type Policy struct { - ID string `json:"id,omitempty"` - Name string `json:"name"` - Description string `json:"description"` - Namespace string `json:"namespace"` - Revision int `json:"revision,omitempty"` - MonitoringEnabled []string `json:"monitoring_enabled,omitempty"` - MonitoringOutputID string `json:"monitoring_output_id,omitempty"` - DataOutputID string `json:"data_output_id,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name"` + Description string `json:"description"` + Namespace string `json:"namespace"` + Revision int `json:"revision,omitempty"` + MonitoringEnabled []string `json:"monitoring_enabled,omitempty"` + MonitoringOutputID string `json:"monitoring_output_id,omitempty"` + DataOutputID string `json:"data_output_id,omitempty"` + IsDefaultFleetServer bool `json:"is_default_fleet_server,omitempty"` } // DownloadedPolicy represents a policy as returned by the download policy API. @@ -40,6 +41,9 @@ func (c *Client) CreatePolicy(ctx context.Context, p Policy) (*Policy, error) { return nil, fmt.Errorf("could not create policy: %w", err) } + if statusCode == http.StatusConflict { + return nil, fmt.Errorf("could not create policy: %w", ErrConflict) + } if statusCode != http.StatusOK { return nil, fmt.Errorf("could not create policy; API status code = %d; response body = %s", statusCode, respBody) } diff --git a/internal/kibana/status.go b/internal/kibana/status.go index 2e16afb9da..3a8ad75e21 100644 --- a/internal/kibana/status.go +++ b/internal/kibana/status.go @@ -16,6 +16,7 @@ const SNAPSHOT_SUFFIX = "-SNAPSHOT" type VersionInfo struct { Number string `json:"number"` BuildSnapshot bool `json:"build_snapshot"` + BuildFlavor string `json:"build_flavor"` } func (v VersionInfo) Version() string { diff --git a/internal/serverless/project.go b/internal/serverless/project.go index d95cd2806c..65e285d644 100644 --- a/internal/serverless/project.go +++ b/internal/serverless/project.go @@ -6,21 +6,13 @@ package serverless import ( "context" - "encoding/json" "fmt" - "io" - "net/http" - "net/url" - "os" - "path/filepath" - "strings" "time" "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/fleetserver" "github.com/elastic/elastic-package/internal/kibana" "github.com/elastic/elastic-package/internal/logger" - "github.com/elastic/elastic-package/internal/profile" - "github.com/elastic/elastic-package/internal/registry" ) const ( @@ -138,54 +130,6 @@ func (p *Project) DefaultFleetServerURL(ctx context.Context, kibanaClient *kiban return fleetURL, nil } -func (p *Project) AddLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error { - logstashFleetOutput := kibana.FleetOutput{ - Name: "logstash-output", - ID: FleetLogstashOutput, - Type: "logstash", - Hosts: []string{"logstash:5044"}, - } - - if err := kibanaClient.AddFleetOutput(ctx, logstashFleetOutput); err != nil { - return fmt.Errorf("failed to add logstash fleet output: %w", err) - } - - return nil -} - -func (p *Project) UpdateLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error { - certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent") - - caFile, err := os.ReadFile(filepath.Join(certsDir, "ca-cert.pem")) - if err != nil { - return fmt.Errorf("failed to read ca certificate: %w", err) - } - - certFile, err := os.ReadFile(filepath.Join(certsDir, "cert.pem")) - if err != nil { - return fmt.Errorf("failed to read client certificate: %w", err) - } - - keyFile, err := os.ReadFile(filepath.Join(certsDir, "key.pem")) - if err != nil { - return fmt.Errorf("failed to read client certificate private key: %w", err) - } - - logstashFleetOutput := kibana.FleetOutput{ - SSL: &kibana.AgentSSL{ - CertificateAuthorities: []string{string(caFile)}, - Certificate: string(certFile), - Key: string(keyFile), - }, - } - - if err := kibanaClient.UpdateFleetOutput(ctx, logstashFleetOutput, FleetLogstashOutput); err != nil { - return fmt.Errorf("failed to update logstash fleet output: %w", err) - } - - return nil -} - func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elasticsearch.Client) error { return elasticsearchClient.CheckHealth(ctx) } @@ -195,35 +139,14 @@ func (p *Project) getKibanaHealth(ctx context.Context, kibanaClient *kibana.Clie } func (p *Project) getFleetHealth(ctx context.Context) error { - statusURL, err := url.JoinPath(p.Endpoints.Fleet, "/api/status") + client, err := fleetserver.NewClient(p.Endpoints.Fleet) if err != nil { - return fmt.Errorf("could not build URL: %w", err) + return fmt.Errorf("could not create Fleet Server client: %w", err) } - logger.Debugf("GET %s", statusURL) - req, err := http.NewRequestWithContext(ctx, "GET", statusURL, nil) + status, err := client.Status(ctx) if err != nil { return err } - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("request failed (url: %s): %w", statusURL, err) - } - defer resp.Body.Close() - if resp.StatusCode >= 300 { - return fmt.Errorf("unexpected status code %v", resp.StatusCode) - } - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("failed to read response body: %w", err) - } - var status struct { - Name string `json:"name"` - Status string `json:"status"` - } - err = json.Unmarshal(body, &status) - if err != nil { - return fmt.Errorf("failed to parse response body: %w", err) - } if status.Status != "HEALTHY" { return fmt.Errorf("fleet status %s", status.Status) @@ -231,58 +154,3 @@ func (p *Project) getFleetHealth(ctx context.Context) error { } return nil } - -func (p *Project) CreateAgentPolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion string, outputId string, selfMonitor bool) error { - policy := kibana.Policy{ - ID: "elastic-agent-managed-ep", - Name: "Elastic-Agent (elastic-package)", - Description: "Policy created by elastic-package", - Namespace: "default", - MonitoringEnabled: []string{}, - DataOutputID: outputId, - } - if selfMonitor { - policy.MonitoringEnabled = []string{"logs", "metrics"} - } - - newPolicy, err := kibanaClient.CreatePolicy(ctx, policy) - if err != nil { - return fmt.Errorf("error while creating agent policy: %w", err) - } - - if selfMonitor { - err := p.createSystemPackagePolicy(ctx, kibanaClient, stackVersion, newPolicy.ID, newPolicy.Namespace) - if err != nil { - return err - } - } - - return nil -} - -func (p *Project) createSystemPackagePolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion, agentPolicyID, namespace string) error { - systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{ - KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX), - }) - if err != nil { - return fmt.Errorf("could not get the system package version for Kibana %v: %w", stackVersion, err) - } - if len(systemPackages) != 1 { - return fmt.Errorf("unexpected number of system package versions for Kibana %s - found %d expected 1", stackVersion, len(systemPackages)) - } - logger.Debugf("Found %s package - version %s", systemPackages[0].Name, systemPackages[0].Version) - packagePolicy := kibana.PackagePolicy{ - Name: "system-1", - PolicyID: agentPolicyID, - Namespace: namespace, - } - packagePolicy.Package.Name = "system" - packagePolicy.Package.Version = systemPackages[0].Version - - _, err = kibanaClient.CreatePackagePolicy(ctx, packagePolicy) - if err != nil { - return fmt.Errorf("error while creating package policy: %w", err) - } - - return nil -} diff --git a/internal/stack/agentpolicy.go b/internal/stack/agentpolicy.go new file mode 100644 index 0000000000..a854775e3a --- /dev/null +++ b/internal/stack/agentpolicy.go @@ -0,0 +1,145 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package stack + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/elastic/elastic-package/internal/kibana" + "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/profile" + "github.com/elastic/elastic-package/internal/registry" +) + +const ( + managedAgentPolicyID = "elastic-agent-managed-ep" + fleetLogstashOutput = "fleet-logstash-output" +) + +// createAgentPolicy creates an agent policy with the initial configuration used for +// agents managed by elastic-package. +func createAgentPolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion string, outputId string, selfMonitor bool) (*kibana.Policy, error) { + policy := kibana.Policy{ + ID: managedAgentPolicyID, + Name: "Elastic-Agent (elastic-package)", + Description: "Policy created by elastic-package", + Namespace: "default", + MonitoringEnabled: []string{}, + DataOutputID: outputId, + } + if selfMonitor { + policy.MonitoringEnabled = []string{"logs", "metrics"} + } + + newPolicy, err := kibanaClient.CreatePolicy(ctx, policy) + if errors.Is(err, kibana.ErrConflict) { + newPolicy, err = kibanaClient.GetPolicy(ctx, policy.ID) + if err != nil { + return nil, fmt.Errorf("error while getting existing policy: %w", err) + } + return newPolicy, nil + } + if err != nil { + return nil, fmt.Errorf("error while creating agent policy: %w", err) + } + + if selfMonitor { + err := createSystemPackagePolicy(ctx, kibanaClient, stackVersion, newPolicy.ID, newPolicy.Namespace) + if err != nil { + return nil, err + } + } + + return newPolicy, nil +} + +func createSystemPackagePolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion, agentPolicyID, namespace string) error { + systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{ + KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX), + }) + if err != nil { + return fmt.Errorf("could not get the system package version for Kibana %v: %w", stackVersion, err) + } + if len(systemPackages) != 1 { + return fmt.Errorf("unexpected number of system package versions for Kibana %s - found %d expected 1", stackVersion, len(systemPackages)) + } + logger.Debugf("Found %s package - version %s", systemPackages[0].Name, systemPackages[0].Version) + packagePolicy := kibana.PackagePolicy{ + Name: "system-1", + PolicyID: agentPolicyID, + Namespace: namespace, + } + packagePolicy.Package.Name = "system" + packagePolicy.Package.Version = systemPackages[0].Version + + _, err = kibanaClient.CreatePackagePolicy(ctx, packagePolicy) + if err != nil { + return fmt.Errorf("error while creating package policy: %w", err) + } + + return nil +} + +func addFleetOutput(ctx context.Context, client *kibana.Client, outputType, host, id string) error { + output := kibana.FleetOutput{ + Name: id, + ID: id, + Type: outputType, + Hosts: []string{host}, + } + + err := client.AddFleetOutput(ctx, output) + if errors.Is(err, kibana.ErrConflict) { + // Output already exists. + return nil + } + if err != nil { + return fmt.Errorf("failed to add %s fleet output of type %s: %w", id, outputType, err) + } + + return nil +} + +func addLogstashFleetOutput(ctx context.Context, client *kibana.Client) error { + return addFleetOutput(ctx, client, "logstash", "logstash:5044", fleetLogstashOutput) +} + +func updateLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error { + certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent") + + caFile, err := os.ReadFile(filepath.Join(certsDir, "ca-cert.pem")) + if err != nil { + return fmt.Errorf("failed to read ca certificate: %w", err) + } + + certFile, err := os.ReadFile(filepath.Join(certsDir, "cert.pem")) + if err != nil { + return fmt.Errorf("failed to read client certificate: %w", err) + } + + keyFile, err := os.ReadFile(filepath.Join(certsDir, "key.pem")) + if err != nil { + return fmt.Errorf("failed to read client certificate private key: %w", err) + } + + logstashFleetOutput := kibana.FleetOutput{ + SSL: &kibana.AgentSSL{ + CertificateAuthorities: []string{string(caFile)}, + Certificate: string(certFile), + Key: string(keyFile), + }, + } + + if err := kibanaClient.UpdateFleetOutput(ctx, logstashFleetOutput, fleetLogstashOutput); err != nil { + return fmt.Errorf("failed to update logstash fleet output: %w", err) + } + + return nil +} diff --git a/internal/stack/serverless.go b/internal/stack/serverless.go index ae44dfcc92..1ab9ad3e49 100644 --- a/internal/stack/serverless.go +++ b/internal/stack/serverless.go @@ -123,7 +123,7 @@ func (sp *serverlessProvider) createProject(ctx context.Context, settings projec } if settings.LogstashEnabled { - err = project.AddLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient) + err = addLogstashFleetOutput(ctx, sp.kibanaClient) if err != nil { return Config{}, err } @@ -269,18 +269,13 @@ func (sp *serverlessProvider) BootUp(ctx context.Context, options Options) error return fmt.Errorf("failed to create deployment: %w", err) } - project, err = sp.currentProjectWithClientsAndFleetEndpoint(ctx, config) - if err != nil { - return fmt.Errorf("failed to retrieve latest project created: %w", err) - } - outputID := "" if settings.LogstashEnabled { outputID = serverless.FleetLogstashOutput } logger.Infof("Creating agent policy") - err = project.CreateAgentPolicy(ctx, sp.kibanaClient, options.StackVersion, outputID, settings.SelfMonitor) + _, err = createAgentPolicy(ctx, sp.kibanaClient, options.StackVersion, outputID, settings.SelfMonitor) if err != nil { return fmt.Errorf("failed to create agent policy: %w", err) @@ -303,7 +298,7 @@ func (sp *serverlessProvider) BootUp(ctx context.Context, options Options) error // Updating the output with ssl certificates created in startLocalServices // The certificates are updated only when a new project is created and logstash is enabled if isNewProject && settings.LogstashEnabled { - err = project.UpdateLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient) + err = updateLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient) if err != nil { return err }