Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove code copied from serverless
  • Loading branch information
jsoriano committed Jan 13, 2025
commit 4e1cceaae0f0fdbcb65b6c714aabf119c060de7a
140 changes: 4 additions & 136 deletions internal/serverless/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,13 @@ package serverless

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/fleetserver"
"github.com/elastic/elastic-package/internal/kibana"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/profile"
"github.com/elastic/elastic-package/internal/registry"
)

const (
Expand Down Expand Up @@ -138,54 +130,6 @@ func (p *Project) DefaultFleetServerURL(ctx context.Context, kibanaClient *kiban
return fleetURL, nil
}

func (p *Project) AddLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error {
logstashFleetOutput := kibana.FleetOutput{
Name: "logstash-output",
ID: FleetLogstashOutput,
Type: "logstash",
Hosts: []string{"logstash:5044"},
}

if err := kibanaClient.AddFleetOutput(ctx, logstashFleetOutput); err != nil {
return fmt.Errorf("failed to add logstash fleet output: %w", err)
}

return nil
}

func (p *Project) UpdateLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error {
certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent")

caFile, err := os.ReadFile(filepath.Join(certsDir, "ca-cert.pem"))
if err != nil {
return fmt.Errorf("failed to read ca certificate: %w", err)
}

certFile, err := os.ReadFile(filepath.Join(certsDir, "cert.pem"))
if err != nil {
return fmt.Errorf("failed to read client certificate: %w", err)
}

keyFile, err := os.ReadFile(filepath.Join(certsDir, "key.pem"))
if err != nil {
return fmt.Errorf("failed to read client certificate private key: %w", err)
}

logstashFleetOutput := kibana.FleetOutput{
SSL: &kibana.AgentSSL{
CertificateAuthorities: []string{string(caFile)},
Certificate: string(certFile),
Key: string(keyFile),
},
}

if err := kibanaClient.UpdateFleetOutput(ctx, logstashFleetOutput, FleetLogstashOutput); err != nil {
return fmt.Errorf("failed to update logstash fleet output: %w", err)
}

return nil
}

func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elasticsearch.Client) error {
return elasticsearchClient.CheckHealth(ctx)
}
Expand All @@ -195,94 +139,18 @@ func (p *Project) getKibanaHealth(ctx context.Context, kibanaClient *kibana.Clie
}

func (p *Project) getFleetHealth(ctx context.Context) error {
statusURL, err := url.JoinPath(p.Endpoints.Fleet, "/api/status")
client, err := fleetserver.NewClient(p.Endpoints.Fleet)
if err != nil {
return fmt.Errorf("could not build URL: %w", err)
return fmt.Errorf("could not create Fleet Server client: %w", err)
}
logger.Debugf("GET %s", statusURL)
req, err := http.NewRequestWithContext(ctx, "GET", statusURL, nil)
status, err := client.Status(ctx)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("request failed (url: %s): %w", statusURL, err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("unexpected status code %v", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
var status struct {
Name string `json:"name"`
Status string `json:"status"`
}
err = json.Unmarshal(body, &status)
if err != nil {
return fmt.Errorf("failed to parse response body: %w", err)
}

if status.Status != "HEALTHY" {
return fmt.Errorf("fleet status %s", status.Status)

}
return nil
}

func (p *Project) CreateAgentPolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion string, outputId string, selfMonitor bool) error {
policy := kibana.Policy{
ID: "elastic-agent-managed-ep",
Name: "Elastic-Agent (elastic-package)",
Description: "Policy created by elastic-package",
Namespace: "default",
MonitoringEnabled: []string{},
DataOutputID: outputId,
}
if selfMonitor {
policy.MonitoringEnabled = []string{"logs", "metrics"}
}

newPolicy, err := kibanaClient.CreatePolicy(ctx, policy)
if err != nil {
return fmt.Errorf("error while creating agent policy: %w", err)
}

if selfMonitor {
err := p.createSystemPackagePolicy(ctx, kibanaClient, stackVersion, newPolicy.ID, newPolicy.Namespace)
if err != nil {
return err
}
}

return nil
}

func (p *Project) createSystemPackagePolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion, agentPolicyID, namespace string) error {
systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{
KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX),
})
if err != nil {
return fmt.Errorf("could not get the system package version for Kibana %v: %w", stackVersion, err)
}
if len(systemPackages) != 1 {
return fmt.Errorf("unexpected number of system package versions for Kibana %s - found %d expected 1", stackVersion, len(systemPackages))
}
logger.Debugf("Found %s package - version %s", systemPackages[0].Name, systemPackages[0].Version)
packagePolicy := kibana.PackagePolicy{
Name: "system-1",
PolicyID: agentPolicyID,
Namespace: namespace,
}
packagePolicy.Package.Name = "system"
packagePolicy.Package.Version = systemPackages[0].Version

_, err = kibanaClient.CreatePackagePolicy(ctx, packagePolicy)
if err != nil {
return fmt.Errorf("error while creating package policy: %w", err)
}

return nil
}
146 changes: 146 additions & 0 deletions internal/stack/agentpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package stack

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/elastic/elastic-package/internal/kibana"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/profile"
"github.com/elastic/elastic-package/internal/registry"
)

const (
managedAgentPolicyID = "elastic-agent-managed-ep"
fleetLogstashOutput = "fleet-logstash-output"
fleetElasticsearchOutput = "fleet-elasticsearch-output"
)

// createAgentPolicy creates an agent policy with the initial configuration used for
// agents managed by elastic-package.
func createAgentPolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion string, outputId string, selfMonitor bool) (*kibana.Policy, error) {
policy := kibana.Policy{
ID: managedAgentPolicyID,
Name: "Elastic-Agent (elastic-package)",
Description: "Policy created by elastic-package",
Namespace: "default",
MonitoringEnabled: []string{},
DataOutputID: outputId,
}
if selfMonitor {
policy.MonitoringEnabled = []string{"logs", "metrics"}
}

newPolicy, err := kibanaClient.CreatePolicy(ctx, policy)
if errors.Is(err, kibana.ErrConflict) {
newPolicy, err = kibanaClient.GetPolicy(ctx, policy.ID)
if err != nil {
return nil, fmt.Errorf("error while getting existing policy: %w", err)
}
return newPolicy, nil
}
if err != nil {
return nil, fmt.Errorf("error while creating agent policy: %w", err)
}

if selfMonitor {
err := createSystemPackagePolicy(ctx, kibanaClient, stackVersion, newPolicy.ID, newPolicy.Namespace)
if err != nil {
return nil, err
}
}

return newPolicy, nil
}

func createSystemPackagePolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion, agentPolicyID, namespace string) error {
systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{
KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX),
})
if err != nil {
return fmt.Errorf("could not get the system package version for Kibana %v: %w", stackVersion, err)
}
if len(systemPackages) != 1 {
return fmt.Errorf("unexpected number of system package versions for Kibana %s - found %d expected 1", stackVersion, len(systemPackages))
}
logger.Debugf("Found %s package - version %s", systemPackages[0].Name, systemPackages[0].Version)
packagePolicy := kibana.PackagePolicy{
Name: "system-1",
PolicyID: agentPolicyID,
Namespace: namespace,
}
packagePolicy.Package.Name = "system"
packagePolicy.Package.Version = systemPackages[0].Version

_, err = kibanaClient.CreatePackagePolicy(ctx, packagePolicy)
if err != nil {
return fmt.Errorf("error while creating package policy: %w", err)
}

return nil
}

func addFleetOutput(ctx context.Context, client *kibana.Client, outputType, host, id string) error {
output := kibana.FleetOutput{
Name: id,
ID: id,
Comment on lines +92 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this fail if tests are run in parallel? Or maybe this will not fail but it will use an unexpected output. Or maybe it is good to reuse the same output when running in parallel.

Or well, the caller is the one to take into account assigning the right/expected IDs.

Copy link
Member Author

@jsoriano jsoriano Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this can be an issue if we use the same stack for different environments, for example if a serverless cluster with two environments, one with logstash and the other one without it. But it should not be an issue though while running tests in parallel with a single environment.

For reusing a stack for different executions we have more conflicts. I would leave fixing them for future improvements. One issue we have is that we are relying on default Fleet configurations (output, fleet server, fleet server policy...), and I think we could avoid it.

I think that ideally each environment should use its own resources, and they should be explicitly set in agent policies, and don't rely on defaults.

Type: outputType,
Hosts: []string{host},
}

err := client.AddFleetOutput(ctx, output)
if errors.Is(err, kibana.ErrConflict) {
// Output already exists.
return nil
}
if err != nil {
return fmt.Errorf("failed to add %s fleet output of type %s: %w", id, outputType, err)
}

return nil
}

func addLogstashFleetOutput(ctx context.Context, client *kibana.Client) error {
return addFleetOutput(ctx, client, "logstash", "logstash:5044", fleetLogstashOutput)
}

func updateLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error {
certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent")

caFile, err := os.ReadFile(filepath.Join(certsDir, "ca-cert.pem"))
if err != nil {
return fmt.Errorf("failed to read ca certificate: %w", err)
}

certFile, err := os.ReadFile(filepath.Join(certsDir, "cert.pem"))
if err != nil {
return fmt.Errorf("failed to read client certificate: %w", err)
}

keyFile, err := os.ReadFile(filepath.Join(certsDir, "key.pem"))
if err != nil {
return fmt.Errorf("failed to read client certificate private key: %w", err)
}

logstashFleetOutput := kibana.FleetOutput{
SSL: &kibana.AgentSSL{
CertificateAuthorities: []string{string(caFile)},
Certificate: string(certFile),
Key: string(keyFile),
},
}

if err := kibanaClient.UpdateFleetOutput(ctx, logstashFleetOutput, fleetLogstashOutput); err != nil {
return fmt.Errorf("failed to update logstash fleet output: %w", err)
}

return nil
}
11 changes: 3 additions & 8 deletions internal/stack/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (sp *serverlessProvider) createProject(ctx context.Context, settings projec
}

if settings.LogstashEnabled {
err = project.AddLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient)
err = addLogstashFleetOutput(ctx, sp.kibanaClient)
if err != nil {
return Config{}, err
}
Expand Down Expand Up @@ -269,18 +269,13 @@ func (sp *serverlessProvider) BootUp(ctx context.Context, options Options) error
return fmt.Errorf("failed to create deployment: %w", err)
}

project, err = sp.currentProjectWithClientsAndFleetEndpoint(ctx, config)
if err != nil {
return fmt.Errorf("failed to retrieve latest project created: %w", err)
}

Comment on lines -272 to -276
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! One less request (or two requests depending if there is default a fleetserver or not).

outputID := ""
if settings.LogstashEnabled {
outputID = serverless.FleetLogstashOutput
}

logger.Infof("Creating agent policy")
err = project.CreateAgentPolicy(ctx, sp.kibanaClient, options.StackVersion, outputID, settings.SelfMonitor)
_, err = createAgentPolicy(ctx, sp.kibanaClient, options.StackVersion, outputID, settings.SelfMonitor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, probably this method could be reused in system/tester.go to create the agent policies too.

policyToEnroll, err = r.kibanaClient.CreatePolicy(ctx, policyEnroll)

policyToTest, err = r.kibanaClient.CreatePolicy(ctx, policy)


if err != nil {
return fmt.Errorf("failed to create agent policy: %w", err)
Expand All @@ -303,7 +298,7 @@ func (sp *serverlessProvider) BootUp(ctx context.Context, options Options) error
// Updating the output with ssl certificates created in startLocalServices
// The certificates are updated only when a new project is created and logstash is enabled
if isNewProject && settings.LogstashEnabled {
err = project.UpdateLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient)
err = updateLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient)
if err != nil {
return err
}
Expand Down