diff --git a/internal/agentdeployer/_static/docker-agent-base.yml.tmpl b/internal/agentdeployer/_static/docker-agent-base.yml.tmpl index f727b6d52d..49612eecbe 100644 --- a/internal/agentdeployer/_static/docker-agent-base.yml.tmpl +++ b/internal/agentdeployer/_static/docker-agent-base.yml.tmpl @@ -5,6 +5,7 @@ {{- $dockerfile_hash := fact "dockerfile_hash" -}} {{- $stack_version := fact "stack_version" }} {{- $agent_image := fact "agent_image" }} +{{- $enrollment_token := fact "enrollment_token" }} services: elastic-agent: hostname: ${AGENT_HOSTNAME} @@ -40,9 +41,13 @@ services: - FLEET_ENROLL=1 - FLEET_URL={{ fact "fleet_url" }} - KIBANA_HOST={{ fact "kibana_host" }} + {{ if eq $enrollment_token "" }} - FLEET_TOKEN_POLICY_NAME=${FLEET_TOKEN_POLICY_NAME} - ELASTICSEARCH_USERNAME={{ fact "elasticsearch_username" }} - ELASTICSEARCH_PASSWORD={{ fact "elasticsearch_password" }} + {{ else }} + - FLEET_ENROLLMENT_TOKEN={{ $enrollment_token }} + {{ end }} volumes: - type: bind source: ${LOCAL_CA_CERT} @@ -57,3 +62,5 @@ services: source: ${SERVICE_LOGS_DIR} target: /run/service_logs/ read_only: false + extra_hosts: + - "host.docker.internal:host-gateway" diff --git a/internal/agentdeployer/_static/elastic-agent-managed.yaml.tmpl b/internal/agentdeployer/_static/elastic-agent-managed.yaml.tmpl index e1f4a7dec2..11631c92c4 100644 --- a/internal/agentdeployer/_static/elastic-agent-managed.yaml.tmpl +++ b/internal/agentdeployer/_static/elastic-agent-managed.yaml.tmpl @@ -44,15 +44,15 @@ spec: value: {{ .fleetURL }} # If left empty KIBANA_HOST, KIBANA_FLEET_USERNAME, KIBANA_FLEET_PASSWORD are needed - name: FLEET_ENROLLMENT_TOKEN - value: "" + value: "{{ .enrollmentToken }}" - name: FLEET_TOKEN_POLICY_NAME value: "{{ .elasticAgentTokenPolicyName }}" - name: KIBANA_HOST value: {{ .kibanaURL }} - name: KIBANA_FLEET_USERNAME - value: "elastic" + value: {{ .username }} - name: KIBANA_FLEET_PASSWORD - value: "changeme" + value: {{ .password }} - name: SSL_CERT_DIR value: "/etc/ssl/certs:/etc/ssl/elastic-package" - name: NODE_NAME diff --git a/internal/agentdeployer/agent.go b/internal/agentdeployer/agent.go index a523ec1520..c8fd2abcdd 100644 --- a/internal/agentdeployer/agent.go +++ b/internal/agentdeployer/agent.go @@ -119,7 +119,7 @@ func (d *DockerComposeAgentDeployer) SetUp(ctx context.Context, agentInfo AgentI fmt.Sprintf("%s=%s", agentHostnameEnv, d.agentHostname()), ) - configDir, err := d.installDockerCompose(agentInfo) + configDir, err := d.installDockerCompose(ctx, agentInfo) if err != nil { return nil, fmt.Errorf("could not create resources for custom agent: %w", err) } @@ -233,7 +233,7 @@ func (d *DockerComposeAgentDeployer) agentName() string { // installDockerCompose creates the files needed to run the custom elastic agent and returns // the directory with these files. -func (d *DockerComposeAgentDeployer) installDockerCompose(agentInfo AgentInfo) (string, error) { +func (d *DockerComposeAgentDeployer) installDockerCompose(ctx context.Context, agentInfo AgentInfo) (string, error) { customAgentDir, err := CreateDeployerDir(d.profile, fmt.Sprintf("docker-agent-%s-%s", d.agentName(), d.agentRunID)) if err != nil { return "", fmt.Errorf("failed to create directory for custom agent files: %w", err) @@ -254,14 +254,31 @@ func (d *DockerComposeAgentDeployer) installDockerCompose(agentInfo AgentInfo) ( if err != nil { return "", fmt.Errorf("failed to load config from profile: %w", err) } + enrollmentToken := "" + if config.ElasticsearchAPIKey != "" { + // TODO: Review if this is the correct place to get the enrollment token. + kibanaClient, err := stack.NewKibanaClientFromProfile(d.profile) + if err != nil { + return "", fmt.Errorf("failed to create kibana client: %w", err) + } + enrollmentToken, err = kibanaClient.GetEnrollmentTokenForPolicyID(ctx, agentInfo.Policy.ID) + if err != nil { + return "", fmt.Errorf("failed to get enrollment token for policy %q: %w", agentInfo.Policy.Name, err) + } + } + // TODO: Include these settings more explicitly in `config`. fleetURL := "https://fleet-server:8220" kibanaHost := "https://kibana:5601" stackVersion := d.stackVersion - if config.Provider == stack.ProviderServerless { - fleetURL = config.Parameters[stack.ParamServerlessFleetURL] + if config.Provider != stack.ProviderCompose { kibanaHost = config.KibanaHost - stackVersion = config.Parameters[stack.ParamServerlessLocalStackVersion] + } + if url, ok := config.Parameters[stack.ParamServerlessFleetURL]; ok { + fleetURL = url + } + if version, ok := config.Parameters[stack.ParamServerlessLocalStackVersion]; ok { + stackVersion = version } agentImage, err := selectElasticAgentImage(stackVersion, agentInfo.Agent.BaseImage) @@ -280,9 +297,10 @@ func (d *DockerComposeAgentDeployer) installDockerCompose(agentInfo AgentInfo) ( "dockerfile_hash": hex.EncodeToString(hashDockerfile), "stack_version": stackVersion, "fleet_url": fleetURL, - "kibana_host": kibanaHost, + "kibana_host": stack.DockerInternalHost(kibanaHost), "elasticsearch_username": config.ElasticsearchUsername, "elasticsearch_password": config.ElasticsearchPassword, + "enrollment_token": enrollmentToken, }) resourceManager.RegisterProvider("file", &resource.FileProvider{ diff --git a/internal/agentdeployer/info.go b/internal/agentdeployer/info.go index 919c2ef980..f2fc9e339d 100644 --- a/internal/agentdeployer/info.go +++ b/internal/agentdeployer/info.go @@ -107,7 +107,4 @@ type AgentInfo struct { AgentSettings } - - // CustomProperties store additional data used to boot up the service, e.g. AWS credentials. - CustomProperties map[string]interface{} } diff --git a/internal/agentdeployer/kubernetes.go b/internal/agentdeployer/kubernetes.go index e7f20e6604..184e11fc33 100644 --- a/internal/agentdeployer/kubernetes.go +++ b/internal/agentdeployer/kubernetes.go @@ -57,7 +57,7 @@ type kubernetesDeployedAgent struct { } func (s kubernetesDeployedAgent) TearDown(ctx context.Context) error { - elasticAgentManagedYaml, err := getElasticAgentYAML(s.profile, s.stackVersion, s.agentInfo.Policy.Name, s.agentName) + elasticAgentManagedYaml, err := getElasticAgentYAML(ctx, s.profile, s.agentInfo, s.stackVersion, s.agentName) if err != nil { return fmt.Errorf("can't retrieve Kubernetes file for Elastic Agent: %w", err) } @@ -123,7 +123,7 @@ func (ksd *KubernetesAgentDeployer) SetUp(ctx context.Context, agentInfo AgentIn if ksd.runTearDown || ksd.runTestsOnly { logger.Debug("Skip install Elastic Agent in cluster") } else { - err = installElasticAgentInCluster(ctx, ksd.profile, ksd.stackVersion, agentInfo.Policy.Name, agentName) + err = installElasticAgentInCluster(ctx, ksd.profile, agentInfo, ksd.stackVersion, agentName) if err != nil { return nil, fmt.Errorf("can't install Elastic-Agent in the Kubernetes cluster: %w", err) } @@ -155,10 +155,10 @@ func (ksd *KubernetesAgentDeployer) agentName() string { var _ AgentDeployer = new(KubernetesAgentDeployer) -func installElasticAgentInCluster(ctx context.Context, profile *profile.Profile, stackVersion, policyName, agentName string) error { +func installElasticAgentInCluster(ctx context.Context, profile *profile.Profile, agentInfo AgentInfo, stackVersion, agentName string) error { logger.Debug("install Elastic Agent in the Kubernetes cluster") - elasticAgentManagedYaml, err := getElasticAgentYAML(profile, stackVersion, policyName, agentName) + elasticAgentManagedYaml, err := getElasticAgentYAML(ctx, profile, agentInfo, stackVersion, agentName) if err != nil { return fmt.Errorf("can't retrieve Kubernetes file for Elastic Agent: %w", err) } @@ -176,8 +176,36 @@ func installElasticAgentInCluster(ctx context.Context, profile *profile.Profile, //go:embed _static/elastic-agent-managed.yaml.tmpl var elasticAgentManagedYamlTmpl string -func getElasticAgentYAML(profile *profile.Profile, stackVersion, policyName, agentName string) ([]byte, error) { +func getElasticAgentYAML(ctx context.Context, profile *profile.Profile, agentInfo AgentInfo, stackVersion, agentName string) ([]byte, error) { logger.Debugf("Prepare YAML definition for Elastic Agent running in stack v%s", stackVersion) + config, err := stack.LoadConfig(profile) + if err != nil { + return nil, fmt.Errorf("failed to load config from profile: %w", err) + } + fleetURL := "https://fleet-server:8220" + kibanaURL := "https://kibana:5601" + if config.Provider != stack.ProviderCompose { + kibanaURL = config.KibanaHost + } + if url, ok := config.Parameters[stack.ParamServerlessFleetURL]; ok { + fleetURL = url + } + if version, ok := config.Parameters[stack.ParamServerlessLocalStackVersion]; ok { + stackVersion = version + } + + enrollmentToken := "" + if config.ElasticsearchAPIKey != "" { + // TODO: Review if this is the correct place to get the enrollment token. + kibanaClient, err := stack.NewKibanaClientFromProfile(profile) + if err != nil { + return nil, fmt.Errorf("failed to create kibana client: %w", err) + } + enrollmentToken, err = kibanaClient.GetEnrollmentTokenForPolicyID(ctx, agentInfo.Policy.ID) + if err != nil { + return nil, fmt.Errorf("failed to get enrollment token for policy %q: %w", agentInfo.Policy.Name, err) + } + } appConfig, err := install.Configuration(install.OptionWithStackVersion(stackVersion)) if err != nil { @@ -193,11 +221,14 @@ func getElasticAgentYAML(profile *profile.Profile, stackVersion, policyName, age var elasticAgentYaml bytes.Buffer err = tmpl.Execute(&elasticAgentYaml, map[string]string{ - "fleetURL": "https://fleet-server:8220", - "kibanaURL": "https://kibana:5601", + "fleetURL": fleetURL, + "kibanaURL": kibanaURL, + "username": config.ElasticsearchUsername, + "password": config.ElasticsearchPassword, + "enrollmentToken": enrollmentToken, "caCertPem": caCert, "elasticAgentImage": appConfig.StackImageRefs().ElasticAgent, - "elasticAgentTokenPolicyName": getTokenPolicyName(stackVersion, policyName), + "elasticAgentTokenPolicyName": getTokenPolicyName(stackVersion, agentInfo.Policy.Name), "agentName": agentName, }) if err != nil { diff --git a/internal/kubectl/kubectl_apply.go b/internal/kubectl/kubectl_apply.go index 460b72b2fd..e0527d162d 100644 --- a/internal/kubectl/kubectl_apply.go +++ b/internal/kubectl/kubectl_apply.go @@ -135,6 +135,8 @@ func waitForReadyResources(resources []resource) error { // be unavailable (DaemonSet.spec.updateStrategy.rollingUpdate.maxUnavailable defaults to 1). // daemonSetReady will return true regardless of the pod not being ready yet. // Can be solved with multi-node clusters. + // TODO: Support context cancelation in this wait. We rely on a helm waiter + // that doesn't support it. err := kubeClient.Wait(resList, readinessTimeout) if err != nil { return fmt.Errorf("waiter failed: %w", err) diff --git a/internal/serverless/project.go b/internal/serverless/project.go index 65e285d644..6d13acc22c 100644 --- a/internal/serverless/project.go +++ b/internal/serverless/project.go @@ -31,8 +31,8 @@ type Project struct { Region string `json:"region_id"` Credentials struct { - Username string `json:"username"` - Password string `json:"password"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` } `json:"credentials"` Endpoints struct { @@ -150,7 +150,7 @@ func (p *Project) getFleetHealth(ctx context.Context) error { if status.Status != "HEALTHY" { return fmt.Errorf("fleet status %s", status.Status) - } + return nil } diff --git a/internal/stack/_static/elastic-agent.env.tmpl b/internal/stack/_static/elastic-agent.env.tmpl index d8888122e1..03242b6c2a 100644 --- a/internal/stack/_static/elastic-agent.env.tmpl +++ b/internal/stack/_static/elastic-agent.env.tmpl @@ -3,8 +3,13 @@ FLEET_ENROLL=1 FLEET_URL={{ fact "fleet_url" }} KIBANA_FLEET_HOST={{ fact "kibana_host" }} KIBANA_HOST={{ fact "kibana_host" }} +{{- $enrollment_token := fact "enrollment_token" }} +{{- if eq $enrollment_token "" }} ELASTICSEARCH_USERNAME={{ fact "username" }} ELASTICSEARCH_PASSWORD={{ fact "password" }} -{{ if not (semverLessThan $version "8.0.0") }} +{{- if not (semverLessThan $version "8.0.0") }} FLEET_TOKEN_POLICY_NAME=Elastic-Agent (elastic-package) -{{ end }} +{{- end }} +{{- else }} +FLEET_ENROLLMENT_TOKEN={{ $enrollment_token }} +{{- end }} diff --git a/internal/stack/_static/fleet-server-healthcheck.sh b/internal/stack/_static/fleet-server-healthcheck.sh index 77cb1021b4..57b437a8a7 100644 --- a/internal/stack/_static/fleet-server-healthcheck.sh +++ b/internal/stack/_static/fleet-server-healthcheck.sh @@ -6,7 +6,7 @@ NUMBER_SUCCESSES="$1" WAITING_TIME="$2" healthcheck() { - curl -s --cacert /etc/ssl/elastic-agent/ca-cert.pem -f https://localhost:8220/api/status | grep -i healthy 2>&1 >/dev/null + curl -s --cacert /etc/ssl/certs/elastic-package.pem -f https://localhost:8220/api/status | grep -i healthy 2>&1 >/dev/null } # Fleet Server can restart after announcing to be healthy, agents connecting during this restart will diff --git a/internal/stack/_static/local-services-docker-compose.yml.tmpl b/internal/stack/_static/local-services-docker-compose.yml.tmpl new file mode 100644 index 0000000000..f7446bba6b --- /dev/null +++ b/internal/stack/_static/local-services-docker-compose.yml.tmpl @@ -0,0 +1,116 @@ +services: +{{- $fleet_server_managed := fact "fleet_server_managed" }} +{{- if eq $fleet_server_managed "true" }} + {{- $fleet_healthcheck_success_checks := 3 -}} + {{- $fleet_healthcheck_waiting_time := 1 -}} + {{- $version := fact "agent_version" -}} + {{- if semverLessThan $version "8.0.0" -}} + {{- $fleet_healthcheck_success_checks = 10 -}} + {{- $fleet_healthcheck_waiting_time = 2 -}} + {{- end }} + fleet-server: + image: "{{ fact "agent_image" }}" + healthcheck: + test: "bash /healthcheck.sh {{ $fleet_healthcheck_success_checks }} {{ $fleet_healthcheck_waiting_time }}" + start_period: 60s + interval: 5s + hostname: docker-fleet-server + environment: + - "ELASTICSEARCH_HOST={{ fact "elasticsearch_host" }}" + - "FLEET_SERVER_CERT=/etc/ssl/fleet-server/cert.pem" + - "FLEET_SERVER_CERT_KEY=/etc/ssl/fleet-server/key.pem" + - "FLEET_SERVER_ELASTICSEARCH_HOST={{ fact "elasticsearch_host" }}" + - "FLEET_SERVER_ENABLE=1" + - "FLEET_SERVER_HOST=0.0.0.0" + - "FLEET_SERVER_SERVICE_TOKEN={{ fact "fleet_service_token" }}" + - "FLEET_SERVER_POLICY={{ fact "fleet_server_policy" }}" + - "FLEET_URL={{ fact "fleet_url" }}" + - "KIBANA_FLEET_HOST={{ fact "kibana_host" }}" + - "KIBANA_FLEET_SERVICE_TOKEN={{ fact "fleet_service_token" }}" + - "KIBANA_FLEET_SERVER_POLICY={{ fact "fleet_server_policy" }}" + - "KIBANA_FLEET_SETUP=1" + - "KIBANA_HOST={{ fact "kibana_host" }}" + volumes: + - "../certs/ca-cert.pem:/etc/ssl/certs/elastic-package.pem:ro" + - "../certs/fleet-server:/etc/ssl/fleet-server:ro" + - "./fleet-server-healthcheck.sh:/healthcheck.sh:ro" + ports: + - "127.0.0.1:8220:8220" + extra_hosts: + - "host.docker.internal:host-gateway" + + fleet-server_is_ready: + image: tianon/true:multiarch + depends_on: + fleet-server: + condition: service_healthy +{{- end }} + + elastic-agent: + image: "{{ fact "agent_image" }}" +{{- if eq $fleet_server_managed "true" }} + depends_on: + fleet-server: + condition: service_healthy +{{- end }} + healthcheck: + test: "elastic-agent status" + timeout: 2s + start_period: 360s + retries: 180 + interval: 5s + hostname: docker-fleet-agent + env_file: "./elastic-agent.env" + cap_drop: + - ALL + volumes: + - type: bind + source: ../../../tmp/service_logs/ + target: /tmp/service_logs/ + # Mount service_logs under /run too as a testing workaround for the journald input (see elastic-package#1235). + - type: bind + source: ../../../tmp/service_logs/ + target: /run/service_logs/ + - "../certs/ca-cert.pem:/etc/ssl/certs/elastic-package.pem" + extra_hosts: + - "host.docker.internal:host-gateway" + + elastic-agent_is_ready: + image: tianon/true:multiarch + depends_on: + elastic-agent: + condition: service_healthy + +{{ $logstash_enabled := fact "logstash_enabled" }} +{{ if eq $logstash_enabled "true" }} + logstash: + build: + dockerfile: "./Dockerfile.logstash" + args: + IMAGE: "{{ fact "logstash_image" }}" + healthcheck: + test: bin/logstash -t + start_period: 120s + interval: 60s + timeout: 60s + retries: 5 + volumes: + - "../certs/logstash:/usr/share/logstash/config/certs" + ports: + - "127.0.0.1:5044:5044" + - "127.0.0.1:9600:9600" + environment: + - XPACK_MONITORING_ENABLED=false + - ELASTIC_API_KEY={{ fact "api_key" }} + - ELASTIC_USER={{ fact "username" }} + - ELASTIC_PASSWORD={{ fact "password" }} + - ELASTIC_HOSTS={{ fact "elasticsearch_host" }} + extra_hosts: + - "host.docker.internal:host-gateway" + + logstash_is_ready: + image: tianon/true:multiarch + depends_on: + logstash: + condition: service_healthy +{{ end }} diff --git a/internal/stack/_static/logstash.conf.tmpl b/internal/stack/_static/logstash.conf.tmpl index ee90cd63fc..ae587f6889 100644 --- a/internal/stack/_static/logstash.conf.tmpl +++ b/internal/stack/_static/logstash.conf.tmpl @@ -9,12 +9,17 @@ input { } {{ $elasticsearch_host := fact "elasticsearch_host" -}} +{{ $api_key := fact "api_key" -}} filter { elastic_integration { remove_field => ['@version'] hosts => ["{{ $elasticsearch_host }}"] + {{- if eq $api_key "" }} username => '{{ fact "username" }}' password => '{{ fact "password" }}' + {{- else }} + api_key => '{{ $api_key }}' + {{- end }} ssl_enabled => true ssl_verification_mode => "none" } @@ -24,8 +29,12 @@ output { if [@metadata][_ingest_document][id] { elasticsearch { hosts => ["{{ $elasticsearch_host }}"] + {{- if eq $api_key "" }} user => '{{ fact "username" }}' password => '{{ fact "password" }}' + {{- else }} + api_key => '{{ $api_key }}' + {{- end }} ssl_enabled => true {{- if eq $elasticsearch_host "https://elasticsearch:9200" }} ssl_certificate_authorities => "/usr/share/logstash/config/certs/ca-cert.pem" @@ -35,8 +44,12 @@ output { } else { elasticsearch { hosts => ["{{ $elasticsearch_host }}"] + {{- if eq $api_key "" }} user => '{{ fact "username" }}' password => '{{ fact "password" }}' + {{- else }} + api_key => '{{ $api_key }}' + {{- end }} ssl_enabled => true {{- if eq $elasticsearch_host "https://elasticsearch:9200" }} ssl_certificate_authorities => "/usr/share/logstash/config/certs/ca-cert.pem" diff --git a/internal/stack/_static/serverless-docker-compose.yml.tmpl b/internal/stack/_static/serverless-docker-compose.yml.tmpl deleted file mode 100644 index bfe053700b..0000000000 --- a/internal/stack/_static/serverless-docker-compose.yml.tmpl +++ /dev/null @@ -1,59 +0,0 @@ -services: - elastic-agent: - image: "{{ fact "agent_image" }}" - healthcheck: - test: "elastic-agent status" - timeout: 2s - start_period: 360s - retries: 180 - interval: 5s - hostname: docker-fleet-agent - env_file: "./elastic-agent.env" - cap_drop: - - ALL - volumes: - - type: bind - source: ../../../tmp/service_logs/ - target: /tmp/service_logs/ - # Mount service_logs under /run too as a testing workaround for the journald input (see elastic-package#1235). - - type: bind - source: ../../../tmp/service_logs/ - target: /run/service_logs/ - - "../certs/ca-cert.pem:/etc/ssl/certs/elastic-package.pem" - - elastic-agent_is_ready: - image: tianon/true:multiarch - depends_on: - elastic-agent: - condition: service_healthy - -{{ $logstash_enabled := fact "logstash_enabled" }} -{{ if eq $logstash_enabled "true" }} - logstash: - build: - dockerfile: "./Dockerfile.logstash" - args: - IMAGE: "{{ fact "logstash_image" }}" - healthcheck: - test: bin/logstash -t - start_period: 120s - interval: 60s - timeout: 60s - retries: 5 - volumes: - - "../certs/logstash:/usr/share/logstash/config/certs" - ports: - - "127.0.0.1:5044:5044" - - "127.0.0.1:9600:9600" - environment: - - XPACK_MONITORING_ENABLED=false - - ELASTIC_USER={{ fact "username" }} - - ELASTIC_PASSWORD={{ fact "password" }} - - ELASTIC_HOSTS={{ fact "elasticsearch_host" }} - - logstash_is_ready: - image: tianon/true:multiarch - depends_on: - logstash: - condition: service_healthy -{{ end }} diff --git a/internal/stack/agentpolicy.go b/internal/stack/agentpolicy.go index a854775e3a..833b1fe0db 100644 --- a/internal/stack/agentpolicy.go +++ b/internal/stack/agentpolicy.go @@ -19,8 +19,9 @@ import ( ) const ( - managedAgentPolicyID = "elastic-agent-managed-ep" - fleetLogstashOutput = "fleet-logstash-output" + managedAgentPolicyID = "elastic-agent-managed-ep" + fleetLogstashOutput = "fleet-logstash-output" + fleetElasticsearchOutput = "fleet-elasticsearch-output" ) // createAgentPolicy creates an agent policy with the initial configuration used for @@ -87,6 +88,32 @@ func createSystemPackagePolicy(ctx context.Context, kibanaClient *kibana.Client, return nil } +func deleteAgentPolicy(ctx context.Context, kibanaClient *kibana.Client) error { + err := kibanaClient.DeletePolicy(ctx, managedAgentPolicyID) + var notFoundError *kibana.ErrPolicyNotFound + if err != nil && !errors.As(err, ¬FoundError) { + return fmt.Errorf("failed to delete policy: %w", err) + } + + return nil +} + +func forceUnenrollAgentsWithPolicy(ctx context.Context, kibanaClient *kibana.Client) error { + agents, err := kibanaClient.QueryAgents(ctx, fmt.Sprintf("policy_id: %s", managedAgentPolicyID)) + if err != nil { + return fmt.Errorf("error while querying agents with policy %s: %w", managedAgentPolicyID, err) + } + + for _, agent := range agents { + err := kibanaClient.RemoveAgent(ctx, agent) + if err != nil { + return fmt.Errorf("failed to remove agent %s: %w", agent.ID, err) + } + } + + return nil +} + func addFleetOutput(ctx context.Context, client *kibana.Client, outputType, host, id string) error { output := kibana.FleetOutput{ Name: id, @@ -111,6 +138,10 @@ func addLogstashFleetOutput(ctx context.Context, client *kibana.Client) error { return addFleetOutput(ctx, client, "logstash", "logstash:5044", fleetLogstashOutput) } +func addElasticsearchFleetOutput(ctx context.Context, client *kibana.Client, host string) error { + return addFleetOutput(ctx, client, "elasticsearch", host, fleetElasticsearchOutput) +} + func updateLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error { certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent") diff --git a/internal/stack/certs.go b/internal/stack/certs.go index 402b978959..f54c2090a3 100644 --- a/internal/stack/certs.go +++ b/internal/stack/certs.go @@ -32,9 +32,12 @@ var tlsServices = []tlsService{ {Name: "elastic-agent", IsClient: true}, } -var tlsServicesServerless = []tlsService{ - {Name: "logstash"}, +// tlsLocalServices is the list of server TLS certificates that will +// be created for local services when the stack is not local. +var tlsLocalServices = []tlsService{ {Name: "elastic-agent", IsClient: true}, + {Name: "fleet-server"}, + {Name: "logstash"}, } var ( diff --git a/internal/stack/certs_test.go b/internal/stack/certs_test.go index 07b6183a7c..d5299e6b13 100644 --- a/internal/stack/certs_test.go +++ b/internal/stack/certs_test.go @@ -21,7 +21,7 @@ func TestTLSCertsInitialization(t *testing.T) { services []tlsService }{ {"tlsServices", tlsServices}, - {"tlsServicesServerless", tlsServicesServerless}, + {"tlsServicesServerless", tlsLocalServices}, } profilePath := t.TempDir() caCertFile := filepath.Join(profilePath, "certs", "ca-cert.pem") diff --git a/internal/stack/config.go b/internal/stack/config.go index db97653229..194aaef615 100644 --- a/internal/stack/config.go +++ b/internal/stack/config.go @@ -26,6 +26,17 @@ type Config struct { ElasticsearchPassword string `json:"elasticsearch_password,omitempty"` KibanaHost string `json:"kibana_host,omitempty"` CACertFile string `json:"ca_cert_file,omitempty"` + + OutputID string `json:"output_id,omitempty"` + FleetServerID string `json:"fleet_server_id,omitempty"` + + // EnrollmentToken is the token used during initialization, it can expire, + // so don't persist it, it won't be reused. + EnrollmentToken string `json:"-"` + + // FleetServiceToken is the service token used during initialization when + // a local Fleet Server is needed. + FleetServiceToken string `json:"-"` } func configPath(profile *profile.Profile) string { diff --git a/internal/stack/dump.go b/internal/stack/dump.go index ba12b4fba9..64cee07bac 100644 --- a/internal/stack/dump.go +++ b/internal/stack/dump.go @@ -61,7 +61,10 @@ func dumpStackLogs(ctx context.Context, options DumpOptions) ([]DumpResult, erro return nil, fmt.Errorf("can't remove output location: %w", err) } - services, err := localServiceNames(DockerComposeProjectName(options.Profile)) + localServices := &localServicesManager{ + profile: options.Profile, + } + services, err := localServices.serviceNames() if err != nil { return nil, fmt.Errorf("failed to get local services: %w", err) } diff --git a/internal/stack/environment.go b/internal/stack/environment.go new file mode 100644 index 0000000000..fdaffa505b --- /dev/null +++ b/internal/stack/environment.go @@ -0,0 +1,416 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package stack + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + + "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/fleetserver" + "github.com/elastic/elastic-package/internal/kibana" + "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/profile" +) + +type environmentProvider struct { + kibana *kibana.Client + elasticsearch *elasticsearch.Client +} + +func newEnvironmentProvider(profile *profile.Profile) (*environmentProvider, error) { + return &environmentProvider{}, nil +} + +// BootUp configures the profile to use as stack the one indicated using environment variables. +func (p *environmentProvider) BootUp(ctx context.Context, options Options) error { + logger.Warn("Configuring an stack from environment variables is in technical preview") + config := Config{ + Provider: ProviderEnvironment, + ElasticsearchAPIKey: os.Getenv(ElasticsearchAPIKeyEnv), + ElasticsearchHost: os.Getenv(ElasticsearchHostEnv), + ElasticsearchUsername: os.Getenv(ElasticsearchUsernameEnv), + ElasticsearchPassword: os.Getenv(ElasticsearchPasswordEnv), + KibanaHost: os.Getenv(KibanaHostEnv), + CACertFile: os.Getenv(CACertificateEnv), + + Parameters: make(map[string]string), + } + if err := requiredEnv(config.ElasticsearchHost, ElasticsearchHostEnv); err != nil { + return err + } + if err := requiredEnv(config.KibanaHost, KibanaHostEnv); err != nil { + return err + } + + err := p.initClients() + if err != nil { + return err + } + // TODO: Migrate from serverless variables. + config.Parameters[ParamServerlessLocalStackVersion] = options.StackVersion + + config, err = p.setupFleet(ctx, config, options) + if err != nil { + return fmt.Errorf("failed to setup Fleet: %w", err) + } + + // We need to store the config here to be able to clean up Fleet if something + // fails later. + err = storeConfig(options.Profile, config) + if err != nil { + return fmt.Errorf("failed to store config: %w", err) + } + + logstashEnabled := options.Profile.Config(configLogstashEnabled, "false") == "true" + if logstashEnabled { + err := addLogstashFleetOutput(ctx, p.kibana) + if err != nil { + return fmt.Errorf("failed to create logstash output: %w", err) + } + config.OutputID = fleetLogstashOutput + } else { + internalHost := DockerInternalHost(config.ElasticsearchHost) + if internalHost != config.ElasticsearchHost { + err := addElasticsearchFleetOutput(ctx, p.kibana, internalHost) + if err != nil { + return fmt.Errorf("failed to create elasticsearch output: %w", err) + } + config.OutputID = fleetElasticsearchOutput + } + } + + // We need to store the config here to be able to clean up the logstash output if something + // fails later. + err = storeConfig(options.Profile, config) + if err != nil { + return fmt.Errorf("failed to store config: %w", err) + } + + selfMonitor := options.Profile.Config(configSelfMonitorEnabled, "false") == "true" + policy, err := createAgentPolicy(ctx, p.kibana, options.StackVersion, config.OutputID, selfMonitor) + if err != nil { + return fmt.Errorf("failed to create agent policy: %w", err) + } + if config.ElasticsearchAPIKey != "" { + config.EnrollmentToken, err = p.kibana.GetEnrollmentTokenForPolicyID(ctx, policy.ID) + if err != nil { + return fmt.Errorf("failed to get an enrollment token for policy %s: %w", policy.Name, err) + } + } + + localServices := &localServicesManager{ + profile: options.Profile, + } + err = localServices.start(ctx, options, config) + if err != nil { + return fmt.Errorf("failed to start local services: %w", err) + } + + if logstashEnabled { + err = updateLogstashFleetOutput(ctx, options.Profile, p.kibana) + if err != nil { + return fmt.Errorf("cannot configure fleet output: %w", err) + } + } + + err = storeConfig(options.Profile, config) + if err != nil { + return fmt.Errorf("failed to store config: %w", err) + } + + return nil +} + +func requiredEnv(value string, envVarName string) error { + if value == "" { + return fmt.Errorf("environment variable %s required", envVarName) + } + return nil +} + +func (p *environmentProvider) initClients() error { + kibana, err := NewKibanaClient() + if err != nil { + return fmt.Errorf("cannot create Kibana client: %w", err) + } + p.kibana = kibana + + elasticsearch, err := NewElasticsearchClient() + if err != nil { + return fmt.Errorf("cannot create Elasticsearch client: %w", err) + } + p.elasticsearch = elasticsearch + + return nil +} + +func (p *environmentProvider) setupFleet(ctx context.Context, config Config, options Options) (Config, error) { + const localFleetServerURL = "https://fleet-server:8220" + + fleetServerURL, err := p.kibana.DefaultFleetServerURL(ctx) + if errors.Is(err, kibana.ErrFleetServerNotFound) || !isFleetServerReachable(ctx, fleetServerURL, config) { + // We need to setup a local Fleet Server + fleetServerURL = localFleetServerURL + config.Parameters[paramFleetServerManaged] = "true" + + host := kibana.FleetServerHost{ + ID: fleetServerHostID(options.Profile.ProfileName), + URLs: []string{fleetServerURL}, + IsDefault: true, + Name: "elastic-package-managed-fleet-server", + } + err := p.kibana.AddFleetServerHost(ctx, host) + if errors.Is(err, kibana.ErrConflict) { + err = p.kibana.UpdateFleetServerHost(ctx, host) + if err != nil { + return config, fmt.Errorf("failed to update existing Fleet Server host (id: %s): %w", host.ID, err) + } + } + if err != nil { + return config, fmt.Errorf("failed to add Fleet Server host: %w", err) + } + + _, err = createFleetServerPolicy(ctx, p.kibana, options.StackVersion, options.Profile.ProfileName) + if err != nil { + return config, fmt.Errorf("failed to create agent policy for Fleet Server: %w", err) + } + + config.FleetServiceToken, err = p.kibana.CreateFleetServiceToken(ctx) + if err != nil { + return config, fmt.Errorf("failed to create service token for Fleet Server: %w", err) + } + } else if err != nil { + return config, fmt.Errorf("failed to discover Fleet Server URL: %w", err) + } + + config.Parameters[ParamServerlessFleetURL] = fleetServerURL + return config, nil +} + +func fleetServerHostID(namespace string) string { + return "elastic-package-" + namespace +} + +func isFleetServerReachable(ctx context.Context, address string, config Config) bool { + client, err := fleetserver.NewClient(address, fleetserver.APIKey(config.ElasticsearchAPIKey)) + if err != nil { + return false + } + status, err := client.Status(ctx) + return err == nil && strings.ToLower(status.Status) == "healthy" +} + +// TearDown stops and/or removes a stack. +func (p *environmentProvider) TearDown(ctx context.Context, options Options) error { + localServices := &localServicesManager{ + profile: options.Profile, + } + err := localServices.destroy(ctx) + if err != nil { + return fmt.Errorf("failed to destroy local services: %w", err) + } + + kibanaClient, err := NewKibanaClientFromProfile(options.Profile) + if err != nil { + return fmt.Errorf("failed to create kibana client: %w", err) + } + err = forceUnenrollAgentsWithPolicy(ctx, kibanaClient) + if err != nil { + return fmt.Errorf("failed to remove agents associated to test policy: %w", err) + } + err = deleteAgentPolicy(ctx, kibanaClient) + if err != nil { + return fmt.Errorf("failed to delete agent policy: %v", err) + } + + config, err := LoadConfig(options.Profile) + if err != nil { + return fmt.Errorf("failed to load configuration: %w", err) + } + if managed, found := config.Parameters[paramFleetServerManaged]; found && managed == "true" { + err = forceUnenrollFleetServerWithPolicy(ctx, kibanaClient) + if err != nil { + return fmt.Errorf("failed to remove managed fleet servers: %w", err) + } + + err = deleteFleetServerPolicy(ctx, kibanaClient) + if err != nil { + return fmt.Errorf("failed to delete fleet server policy: %w", err) + } + } + + if config.OutputID != "" { + err := kibanaClient.RemoveFleetOutput(ctx, config.OutputID) + if err != nil { + return fmt.Errorf("failed to delete %s output: %s", config.OutputID, err) + } + } + + return nil +} + +// Update updates resources associated to a stack. +func (p *environmentProvider) Update(context.Context, Options) error { + return fmt.Errorf("not implemented") +} + +// Dump dumps data for debug purpouses. +func (p *environmentProvider) Dump(ctx context.Context, options DumpOptions) ([]DumpResult, error) { + for _, service := range options.Services { + if service != "elastic-agent" { + return nil, &ErrNotImplemented{ + Operation: fmt.Sprintf("logs dump for service %s", service), + Provider: ProviderServerless, + } + } + } + return Dump(ctx, options) +} + +// Status obtains status information of the stack. +func (p *environmentProvider) Status(ctx context.Context, options Options) ([]ServiceStatus, error) { + status := []ServiceStatus{ + p.elasticsearchStatus(ctx, options), + p.kibanaStatus(ctx, options), + } + + config, err := LoadConfig(options.Profile) + if err != nil { + return nil, fmt.Errorf("failed to load configuration: %w", err) + } + // If fleet is managed, it will be included in the local services status. + fleetManaged := true + if managed, ok := config.Parameters[paramFleetServerManaged]; !ok || managed != "true" { + fleetManaged = false + status = append(status, p.fleetStatus(ctx, options, config)) + } + + localServices := &localServicesManager{ + profile: options.Profile, + } + localStatus, err := localServices.status() + if err != nil { + return nil, fmt.Errorf("cannot obtain status of local services: %w", err) + } + if len(localStatus) == 0 { + localStatus = []ServiceStatus{{ + Name: "elastic-agent", + Version: "unknown", + Status: "missing", + }} + if fleetManaged { + localStatus = append(localStatus, ServiceStatus{ + Name: "fleet-server", + Version: "unknown", + Status: "missing", + }) + } + } + + status = append(status, localStatus...) + return status, nil +} + +func (p *environmentProvider) elasticsearchStatus(ctx context.Context, options Options) ServiceStatus { + status := ServiceStatus{ + Name: "elasticsearch", + Version: "unknown", + } + client, err := NewElasticsearchClientFromProfile(options.Profile) + if err != nil { + status.Status = "unknown: failed to create client: " + err.Error() + return status + } + + err = client.CheckHealth(ctx) + if err != nil { + status.Status = "unhealthy: " + err.Error() + } else { + status.Status = "healthy" + } + + info, err := client.Info(ctx) + if err != nil { + status.Version = "unknown" + } else if info.Version.BuildFlavor == "serverless" { + status.Version = "serverless" + } else { + status.Version = info.Version.Number + } + + return status +} + +func (p *environmentProvider) kibanaStatus(ctx context.Context, options Options) ServiceStatus { + status := ServiceStatus{ + Name: "kibana", + Version: "unknown", + } + client, err := NewKibanaClientFromProfile(options.Profile) + if err != nil { + status.Status = "unknown: failed to create client: " + err.Error() + return status + } + + err = client.CheckHealth(ctx) + if err != nil { + status.Status = "unhealthy: " + err.Error() + } else { + status.Status = "healthy" + } + + versionInfo, err := client.Version() + if err == nil { + if versionInfo.BuildFlavor == "serverless" { + status.Version = "serverless" + } else { + status.Version = versionInfo.Version() + } + } + + return status +} + +func (p *environmentProvider) fleetStatus(ctx context.Context, options Options, config Config) ServiceStatus { + status := ServiceStatus{ + Name: "fleet-server", + Version: "unknown", + } + + address, ok := config.Parameters[ParamServerlessFleetURL] + if !ok || address == "" { + status.Status = "unknown address" + return status + } + + client, err := fleetserver.NewClient(address, + fleetserver.APIKey(config.ElasticsearchAPIKey), + fleetserver.CertificateAuthority(config.CACertFile), + ) + if err != nil { + status.Status = "unknown: " + err.Error() + } + + fleetServerStatus, err := client.Status(ctx) + if err != nil { + status.Status = "unknown: " + err.Error() + } else if fleetServerStatus.Status != "" { + status.Status = strings.ToLower(fleetServerStatus.Status) + } + + if fleetServerStatus != nil { + if version := fleetServerStatus.Version.Number; version != "" { + status.Version = version + } else { + status.Version = "unknown" + } + } + + return status +} diff --git a/internal/stack/fleetserverpolicy.go b/internal/stack/fleetserverpolicy.go new file mode 100644 index 0000000000..8b04308ec1 --- /dev/null +++ b/internal/stack/fleetserverpolicy.go @@ -0,0 +1,105 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package stack + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/elastic/elastic-package/internal/kibana" + "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/registry" +) + +const ( + managedFleetServerPolicyID = "fleet-server-managed-ep" +) + +// createFleetServerPolicy creates an agent policy with the initial configuration used for +// agents managed by elastic-package. +func createFleetServerPolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion string, namespace string) (*kibana.Policy, error) { + policy := kibana.Policy{ + Name: "Fleet Server (elastic-package)", + ID: managedFleetServerPolicyID, + IsDefaultFleetServer: true, + Description: "Policy created by elastic-package", + Namespace: "default", + MonitoringEnabled: []string{}, + } + + newPolicy, err := kibanaClient.CreatePolicy(ctx, policy) + if errors.Is(err, kibana.ErrConflict) { + newPolicy, err = kibanaClient.GetPolicy(ctx, policy.ID) + if err != nil { + return nil, fmt.Errorf("error while getting existing policy: %w", err) + } + return newPolicy, nil + } + if err != nil { + return nil, fmt.Errorf("error while creating agent policy: %w", err) + } + + err = createFleetServerPackagePolicy(ctx, kibanaClient, stackVersion, newPolicy.ID, newPolicy.Namespace) + if err != nil { + return nil, err + } + + return newPolicy, nil +} + +func createFleetServerPackagePolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion, agentPolicyID, namespace string) error { + packages, err := registry.Production.Revisions("fleet_server", registry.SearchOptions{ + KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX), + }) + if err != nil { + return fmt.Errorf("could not get the fleet_server package version for Kibana %v: %w", stackVersion, err) + } + if len(packages) != 1 { + return fmt.Errorf("unexpected number of fleet_server package versions for Kibana %s - found %d expected 1", stackVersion, len(packages)) + } + logger.Debugf("Found %s package - version %s", packages[0].Name, packages[0].Version) + packagePolicy := kibana.PackagePolicy{ + Name: "fleet-server-ep", + PolicyID: agentPolicyID, + Namespace: namespace, + } + packagePolicy.Package.Name = "fleet_server" + packagePolicy.Package.Version = packages[0].Version + + _, err = kibanaClient.CreatePackagePolicy(ctx, packagePolicy) + if err != nil { + return fmt.Errorf("error while creating package policy: %w", err) + } + + return nil +} + +func deleteFleetServerPolicy(ctx context.Context, kibanaClient *kibana.Client) error { + err := kibanaClient.DeletePolicy(ctx, managedFleetServerPolicyID) + var notFoundError *kibana.ErrPolicyNotFound + if err != nil && !errors.As(err, ¬FoundError) { + return fmt.Errorf("failed to delete policy: %w", err) + } + + return nil +} + +func forceUnenrollFleetServerWithPolicy(ctx context.Context, kibanaClient *kibana.Client) error { + agents, err := kibanaClient.QueryAgents(ctx, fmt.Sprintf("policy_id: %s", managedFleetServerPolicyID)) + if err != nil { + return fmt.Errorf("error while querying agents with policy %s: %w", managedFleetServerPolicyID, err) + } + + for _, agent := range agents { + err := kibanaClient.RemoveAgent(ctx, agent) + if err != nil { + return fmt.Errorf("failed to remove agent %s: %w", agent.ID, err) + } + } + + return nil +} diff --git a/internal/stack/serverlessresources.go b/internal/stack/localresources.go similarity index 57% rename from internal/stack/serverlessresources.go rename to internal/stack/localresources.go index 058290665e..8f3fdc0c5f 100644 --- a/internal/stack/serverlessresources.go +++ b/internal/stack/localresources.go @@ -18,11 +18,17 @@ import ( "github.com/elastic/elastic-package/internal/profile" ) +const paramFleetServerManaged = "fleet_server_managed" + var ( - serverlessStackResources = []resource.Resource{ + localStackResources = []resource.Resource{ + &resource.File{ + Path: FleetServerHealthcheckFile, + Content: staticSource.File("_static/fleet-server-healthcheck.sh"), + }, &resource.File{ Path: ComposeFile, - Content: staticSource.Template("_static/serverless-docker-compose.yml.tmpl"), + Content: staticSource.Template("_static/local-services-docker-compose.yml.tmpl"), }, &resource.File{ Path: ElasticAgentEnvFile, @@ -31,7 +37,9 @@ var ( } ) -func applyServerlessResources(profile *profile.Profile, stackVersion string, config Config) error { +// applyLocalResources creates the local resources needed to run system tests when the stack +// is not local. +func applyLocalResources(profile *profile.Profile, stackVersion string, config Config) error { appConfig, err := install.Configuration(install.OptionWithStackVersion(stackVersion)) if err != nil { return fmt.Errorf("can't read application configuration: %w", err) @@ -42,15 +50,20 @@ func applyServerlessResources(profile *profile.Profile, stackVersion string, con resourceManager := resource.NewManager() resourceManager.AddFacter(resource.StaticFacter{ - "agent_version": stackVersion, - "agent_image": imageRefs.ElasticAgent, - "logstash_image": imageRefs.Logstash, - "elasticsearch_host": esHostWithPort(config.ElasticsearchHost), - "username": config.ElasticsearchUsername, - "password": config.ElasticsearchPassword, - "kibana_host": config.KibanaHost, - "fleet_url": config.Parameters[ParamServerlessFleetURL], - "logstash_enabled": profile.Config("stack.logstash_enabled", "false"), + "agent_version": stackVersion, + "agent_image": imageRefs.ElasticAgent, + "logstash_image": imageRefs.Logstash, + "elasticsearch_host": DockerInternalHost(esHostWithPort(config.ElasticsearchHost)), + "api_key": config.ElasticsearchAPIKey, + "username": config.ElasticsearchUsername, + "password": config.ElasticsearchPassword, + "kibana_host": DockerInternalHost(config.KibanaHost), + "fleet_url": config.Parameters[ParamServerlessFleetURL], + "enrollment_token": config.EnrollmentToken, + "logstash_enabled": profile.Config("stack.logstash_enabled", "false"), + "fleet_server_managed": config.Parameters[paramFleetServerManaged], + "fleet_server_policy": managedFleetServerPolicyID, + "fleet_service_token": config.FleetServiceToken, }) os.MkdirAll(stackDir, 0755) @@ -58,13 +71,13 @@ func applyServerlessResources(profile *profile.Profile, stackVersion string, con Prefix: stackDir, }) - resources := append([]resource.Resource{}, serverlessStackResources...) + resources := append([]resource.Resource{}, localStackResources...) // Keeping certificates in the profile directory for backwards compatibility reasons. resourceManager.RegisterProvider("certs", &resource.FileProvider{ Prefix: profile.ProfilePath, }) - certResources, err := initTLSCertificates("certs", profile.ProfilePath, tlsServicesServerless) + certResources, err := initTLSCertificates("certs", profile.ProfilePath, tlsLocalServices) if err != nil { return fmt.Errorf("failed to create TLS files: %w", err) } @@ -104,3 +117,24 @@ func esHostWithPort(host string) string { return host } + +func DockerInternalHost(host string) string { + url, err := url.Parse(host) + if err != nil { + return host + } + + ip := net.ParseIP(url.Hostname()) + if url.Hostname() == "localhost" || (ip != nil && ip.IsLoopback()) { + const hostInternal = "host.docker.internal" + if url.Port() == "" { + url.Host = hostInternal + } else { + url.Host = net.JoinHostPort(hostInternal, url.Port()) + } + + return url.String() + } + + return host +} diff --git a/internal/stack/serverlessresources_test.go b/internal/stack/localresources_test.go similarity index 50% rename from internal/stack/serverlessresources_test.go rename to internal/stack/localresources_test.go index 707ea97eb2..081ab17cda 100644 --- a/internal/stack/serverlessresources_test.go +++ b/internal/stack/localresources_test.go @@ -31,3 +31,30 @@ func TestEsHostWithPort(t *testing.T) { }) } } + +func TestDockerInternalHost(t *testing.T) { + tests := []struct { + name string + host string + want string + }{ + {"host without port", "https://hostname", "https://hostname"}, + {"host with port", "https://hostname:443", "https://hostname:443"}, + {"localhost without port", "https://localhost", "https://host.docker.internal"}, + {"localhost wit port", "https://localhost:443", "https://host.docker.internal:443"}, + {"host with path", "https://hostname/abc", "https://hostname/abc"}, + {"host with port and path", "https://hostname:443/abx", "https://hostname:443/abx"}, + {"localhost with path", "https://localhost/abc", "https://host.docker.internal/abc"}, + {"localhost with port and path", "https://localhost:443/abc", "https://host.docker.internal:443/abc"}, + {"ip with port and path", "http://127.0.1.1:443/abc", "http://host.docker.internal:443/abc"}, + {"ipv6 with port and path", "http://[::1]:443/abc", "http://host.docker.internal:443/abc"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := DockerInternalHost(tt.host); got != tt.want { + t.Errorf("dockerInternalHost() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/stack/localservices.go b/internal/stack/localservices.go new file mode 100644 index 0000000000..6c25162905 --- /dev/null +++ b/internal/stack/localservices.go @@ -0,0 +1,151 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package stack + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/elastic/elastic-package/internal/compose" + "github.com/elastic/elastic-package/internal/docker" + "github.com/elastic/elastic-package/internal/profile" +) + +type localServicesManager struct { + profile *profile.Profile +} + +func (m *localServicesManager) start(ctx context.Context, options Options, config Config) error { + err := applyLocalResources(m.profile, options.StackVersion, config) + if err != nil { + return fmt.Errorf("could not initialize compose files for local services: %w", err) + } + + project, err := m.composeProject() + if err != nil { + return fmt.Errorf("could not initialize local services compose project") + } + + opts := compose.CommandOptions{ + ExtraArgs: []string{}, + } + err = project.Build(ctx, opts) + if err != nil { + return fmt.Errorf("failed to build images for local services: %w", err) + } + + if options.DaemonMode { + opts.ExtraArgs = append(opts.ExtraArgs, "-d") + } + if err := project.Up(ctx, opts); err != nil { + // At least starting on 8.6.0, fleet-server may be reconfigured or + // restarted after being healthy. If elastic-agent tries to enroll at + // this moment, it fails inmediately, stopping and making `docker-compose up` + // to fail too. + // As a workaround, try to give another chance to docker-compose if only + // elastic-agent failed. + if onlyElasticAgentFailed(ctx, options) && !errors.Is(err, context.Canceled) { + fmt.Println("Elastic Agent failed to start, trying again.") + if err := project.Up(ctx, opts); err != nil { + return fmt.Errorf("failed to start local services: %w", err) + } + } + } + + return nil +} + +func (m *localServicesManager) destroy(ctx context.Context) error { + project, err := m.composeProject() + if err != nil { + return fmt.Errorf("could not initialize local services compose project") + } + + opts := compose.CommandOptions{ + // Remove associated volumes. + ExtraArgs: []string{"--volumes", "--remove-orphans"}, + } + err = project.Down(ctx, opts) + if err != nil { + return fmt.Errorf("failed to destroy local services: %w", err) + } + + return nil +} + +func (m *localServicesManager) status() ([]ServiceStatus, error) { + var services []ServiceStatus + serviceStatusFunc := func(description docker.ContainerDescription) error { + service, err := newServiceStatus(&description) + if err != nil { + return err + } + services = append(services, *service) + return nil + } + + err := m.visitDescriptions(serviceStatusFunc) + if err != nil { + return nil, err + } + + return services, nil +} + +func (m *localServicesManager) serviceNames() ([]string, error) { + services := []string{} + serviceFunc := func(description docker.ContainerDescription) error { + services = append(services, description.Config.Labels.ComposeService) + return nil + } + + err := m.visitDescriptions(serviceFunc) + if err != nil { + return nil, err + } + + return services, nil +} + +func (m *localServicesManager) visitDescriptions(serviceFunc func(docker.ContainerDescription) error) error { + // query directly to docker to avoid load environment variables (e.g. STACK_VERSION_VARIANT) and profiles + project := m.composeProjectName() + containerIDs, err := docker.ContainerIDsWithLabel(projectLabelDockerCompose, project) + if err != nil { + return err + } + + if len(containerIDs) == 0 { + return nil + } + + containerDescriptions, err := docker.InspectContainers(containerIDs...) + if err != nil { + return err + } + + for _, containerDescription := range containerDescriptions { + serviceName := containerDescription.Config.Labels.ComposeService + if strings.HasSuffix(serviceName, readyServicesSuffix) { + continue + } + err := serviceFunc(containerDescription) + if err != nil { + return err + } + } + return nil +} + +func (m *localServicesManager) composeProject() (*compose.Project, error) { + composeFile := m.profile.Path(ProfileStackPath, ComposeFile) + return compose.NewProject(m.composeProjectName(), composeFile) +} + +func (m *localServicesManager) composeProjectName() string { + return DockerComposeProjectName(m.profile) +} diff --git a/internal/stack/providers.go b/internal/stack/providers.go index 4fd08cad0c..6e94714a1e 100644 --- a/internal/stack/providers.go +++ b/internal/stack/providers.go @@ -13,14 +13,16 @@ import ( ) const ( - ProviderCompose = "compose" - ProviderServerless = "serverless" + ProviderCompose = "compose" + ProviderEnvironment = "environment" + ProviderServerless = "serverless" ) var ( DefaultProvider = ProviderCompose SupportedProviders = []string{ ProviderCompose, + ProviderEnvironment, ProviderServerless, } ) @@ -55,6 +57,8 @@ func BuildProvider(name string, profile *profile.Profile) (Provider, error) { switch name { case ProviderCompose: return &composeProvider{}, nil + case ProviderEnvironment: + return newEnvironmentProvider(profile) case ProviderServerless: return newServerlessProvider(profile) } diff --git a/internal/stack/resources.go b/internal/stack/resources.go index b99147ecc7..a7d23f5df1 100644 --- a/internal/stack/resources.go +++ b/internal/stack/resources.go @@ -156,8 +156,10 @@ func applyResources(profile *profile.Profile, stackVersion string) error { "fleet_url": "https://fleet-server:8220", "elasticsearch_host": "https://elasticsearch:9200", - "username": elasticsearchUsername, - "password": elasticsearchPassword, + "api_key": "", + "username": elasticsearchUsername, + "password": elasticsearchPassword, + "enrollment_token": "", "agent_publish_ports": strings.Join(agentPorts, ","), "apm_enabled": profile.Config(configAPMEnabled, "false"), diff --git a/internal/stack/serverless.go b/internal/stack/serverless.go index 1ab9ad3e49..817a5b56bf 100644 --- a/internal/stack/serverless.go +++ b/internal/stack/serverless.go @@ -317,7 +317,7 @@ func (sp *serverlessProvider) localServicesComposeProject() (*compose.Project, e } func (sp *serverlessProvider) startLocalServices(ctx context.Context, options Options, config Config) error { - err := applyServerlessResources(sp.profile, options.StackVersion, config) + err := applyLocalResources(sp.profile, options.StackVersion, config) if err != nil { return fmt.Errorf("could not initialize compose files for local services: %w", err) } @@ -481,21 +481,6 @@ func (sp *serverlessProvider) localAgentStatus() ([]ServiceStatus, error) { return services, nil } -func localServiceNames(project string) ([]string, error) { - services := []string{} - serviceFunc := func(description docker.ContainerDescription) error { - services = append(services, description.Config.Labels.ComposeService) - return nil - } - - err := runOnLocalServices(project, serviceFunc) - if err != nil { - return nil, err - } - - return services, nil -} - func runOnLocalServices(project string, serviceFunc func(docker.ContainerDescription) error) error { // query directly to docker to avoid load environment variables (e.g. STACK_VERSION_VARIANT) and profiles containerIDs, err := docker.ContainerIDsWithLabel(projectLabelDockerCompose, project) diff --git a/internal/testrunner/runners/system/tester.go b/internal/testrunner/runners/system/tester.go index 53327bac6e..e24ddff368 100644 --- a/internal/testrunner/runners/system/tester.go +++ b/internal/testrunner/runners/system/tester.go @@ -325,8 +325,13 @@ func (r tester) Parallel() bool { // Run runs the system tests defined under the given folder func (r *tester) Run(ctx context.Context) ([]testrunner.TestResult, error) { + stackConfig, err := stack.LoadConfig(r.profile) + if err != nil { + return nil, err + } + if !r.runSetup && !r.runTearDown && !r.runTestsOnly { - return r.run(ctx) + return r.run(ctx, stackConfig) } result := r.newResult("(init)") @@ -354,7 +359,7 @@ func (r *tester) Run(ctx context.Context) ([]testrunner.TestResult, error) { } result = r.newResult(fmt.Sprintf("%s - %s", resultName, testConfig.Name())) - scenario, err := r.prepareScenario(ctx, testConfig, svcInfo) + scenario, err := r.prepareScenario(ctx, testConfig, stackConfig, svcInfo) if r.runSetup && err != nil { tdErr := r.tearDownTest(ctx) if tdErr != nil { @@ -556,18 +561,18 @@ func (r *tester) tearDownTest(ctx context.Context) error { r.removeAgentHandler = nil } - if r.deleteTestPolicyHandler != nil { - if err := r.deleteTestPolicyHandler(cleanupCtx); err != nil { + if r.shutdownAgentHandler != nil { + if err := r.shutdownAgentHandler(cleanupCtx); err != nil { return err } - r.deleteTestPolicyHandler = nil + r.shutdownAgentHandler = nil } - if r.shutdownAgentHandler != nil { - if err := r.shutdownAgentHandler(cleanupCtx); err != nil { + if r.deleteTestPolicyHandler != nil { + if err := r.deleteTestPolicyHandler(cleanupCtx); err != nil { return err } - r.shutdownAgentHandler = nil + r.deleteTestPolicyHandler = nil } return nil @@ -582,12 +587,12 @@ func (r *tester) newResult(name string) *testrunner.ResultComposer { }) } -func (r *tester) run(ctx context.Context) (results []testrunner.TestResult, err error) { +func (r *tester) run(ctx context.Context, stackConfig stack.Config) (results []testrunner.TestResult, err error) { result := r.newResult("(init)") startTesting := time.Now() - results, err = r.runTestPerVariant(ctx, result, r.configFileName, r.serviceVariant) + results, err = r.runTestPerVariant(ctx, stackConfig, result, r.configFileName, r.serviceVariant) if err != nil { return results, err } @@ -606,11 +611,6 @@ func (r *tester) run(ctx context.Context) (results []testrunner.TestResult, err } defer os.RemoveAll(tempDir) - stackConfig, err := stack.LoadConfig(r.profile) - if err != nil { - return nil, err - } - provider, err := stack.BuildProvider(stackConfig.Provider, r.profile) if err != nil { return nil, fmt.Errorf("failed to build stack provider: %w", err) @@ -634,7 +634,7 @@ func (r *tester) run(ctx context.Context) (results []testrunner.TestResult, err return results, nil } -func (r *tester) runTestPerVariant(ctx context.Context, result *testrunner.ResultComposer, cfgFile, variantName string) ([]testrunner.TestResult, error) { +func (r *tester) runTestPerVariant(ctx context.Context, stackConfig stack.Config, result *testrunner.ResultComposer, cfgFile, variantName string) ([]testrunner.TestResult, error) { svcInfo, err := r.createServiceInfo() if err != nil { return result.WithError(err) @@ -647,7 +647,7 @@ func (r *tester) runTestPerVariant(ctx context.Context, result *testrunner.Resul } logger.Debugf("Using config: %q", testConfig.Name()) - partial, err := r.runTest(ctx, testConfig, svcInfo) + partial, err := r.runTest(ctx, testConfig, stackConfig, svcInfo) tdErr := r.tearDownTest(ctx) if err != nil { @@ -938,7 +938,7 @@ func (r *tester) deleteDataStream(ctx context.Context, dataStream string) error return nil } -func (r *tester) prepareScenario(ctx context.Context, config *testConfig, svcInfo servicedeployer.ServiceInfo) (*scenarioTest, error) { +func (r *tester) prepareScenario(ctx context.Context, config *testConfig, stackConfig stack.Config, svcInfo servicedeployer.ServiceInfo) (*scenarioTest, error) { serviceOptions := r.createServiceOptions(config.ServiceVariantName) var err error @@ -1029,9 +1029,13 @@ func (r *tester) prepareScenario(ctx context.Context, config *testConfig, svcInf Namespace: common.CreateTestRunID(), } // Assign the data_output_id to the agent policy to configure the output to logstash. The value is inferred from stack/_static/kibana.yml.tmpl + // TODO: Migrate from stack.logstash_enabled to the stack config. if r.profile.Config("stack.logstash_enabled", "false") == "true" { policy.DataOutputID = "fleet-logstash-output" } + if stackConfig.OutputID != "" { + policy.DataOutputID = stackConfig.OutputID + } policyToTest, err = r.kibanaClient.CreatePolicy(ctx, policy) if err != nil { return nil, fmt.Errorf("could not create test policy: %w", err) @@ -1606,7 +1610,7 @@ func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.Re return result.WithSuccess() } -func (r *tester) runTest(ctx context.Context, config *testConfig, svcInfo servicedeployer.ServiceInfo) ([]testrunner.TestResult, error) { +func (r *tester) runTest(ctx context.Context, config *testConfig, stackConfig stack.Config, svcInfo servicedeployer.ServiceInfo) ([]testrunner.TestResult, error) { result := r.newResult(config.Name()) if skip := testrunner.AnySkipConfig(config.Skip, r.globalTestConfig.Skip); skip != nil { @@ -1618,7 +1622,7 @@ func (r *tester) runTest(ctx context.Context, config *testConfig, svcInfo servic logger.Debugf("running test with configuration '%s'", config.Name()) - scenario, err := r.prepareScenario(ctx, config, svcInfo) + scenario, err := r.prepareScenario(ctx, config, stackConfig, svcInfo) if err != nil { return result.WithError(err) }