Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,21 @@ receivers:
* A route can have many sub-routes, forming a tree.
* Routing starts from the root route.

## Note:

If you are operating a big cluster with many events, you might want to adjust the following values in configuration to
avoid throttling issues:

```
kubeQPS: 60
kubeBurst: 60
```
## Troubleshoot "Events Discarded" warning:

- If there are `client-side throttling` warnings in the event-exporter log:
Adjust the following values in configuration:
```
kubeQPS: 100
kubeBurst: 500
```
> `Burst` to roughly match your events per minute
> `QPS` to be 1/5 of the burst
- If there is no request throttling, but events are still dropped:
Consider increasing events cut off age
```
maxEventAgeSeconds: 60
```

### Opsgenie

Expand Down Expand Up @@ -271,13 +277,10 @@ Standard out is also another file in Linux. `logLevel` refers to the application
`trace`, `debug`, `info`, `warn`, `error`, `fatal` and `panic`. When not specified, default level is set to `info`. You
can use the following configuration as an example.

By default, events emit with eventime > 5seconds since catching are not collected.
You can set this period with throttlePeriod in seconds. Consider to increase time of seconds to catch more events like "Backoff".

```yaml
logLevel: error
logFormat: json
throttlePeriod: 5
maxEventAgeSeconds: 5
route:
routes:
- match:
Expand Down
2 changes: 1 addition & 1 deletion config.example.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
logLevel: debug
logFormat: json
throttlePeriod: 10
maxEventAgeSeconds: 10
kubeQPS: 60
kubeBurst: 60
# namespace: my-namespace-only # Omitting it defaults to all namespaces.
Expand Down
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func main() {
log.Fatal().Str("log_format", cfg.LogFormat).Msg("Unknown log format")
}

if cfg.ThrottlePeriod == 0 {
cfg.ThrottlePeriod = 5
if err := cfg.Validate(); err != nil {
log.Fatal().Err(err).Msg("config validation failed")
}

kubeconfig, err := kube.GetKubernetesConfig()
Expand All @@ -84,7 +84,7 @@ func main() {
engine.OnEvent(event)
}
}
w := kube.NewEventWatcher(kubeconfig, cfg.Namespace, cfg.ThrottlePeriod, onEvent)
w := kube.NewEventWatcher(kubeconfig, cfg.Namespace, cfg.MaxEventAgeSeconds, onEvent)

ctx, cancel := context.WithCancel(context.Background())
leaderLost := make(chan bool)
Expand Down
55 changes: 45 additions & 10 deletions pkg/exporter/config.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,65 @@
package exporter

import (
"errors"
"strconv"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/resmoio/kubernetes-event-exporter/pkg/sinks"
"github.com/rs/zerolog/log"
)

// Config allows configuration
type Config struct {
// Route is the top route that the events will match
// TODO: There is currently a tight coupling with route and config, but not with receiver config and sink so
// TODO: I am not sure what to do here.
LogLevel string `yaml:"logLevel"`
LogFormat string `yaml:"logFormat"`
ThrottlePeriod int64 `yaml:"throttlePeriod"`
ClusterName string `yaml:"clusterName,omitempty"`
Namespace string `yaml:"namespace"`
LeaderElection kube.LeaderElectionConfig `yaml:"leaderElection"`
Route Route `yaml:"route"`
Receivers []sinks.ReceiverConfig `yaml:"receivers"`
KubeQPS float32 `yaml:"kubeQPS,omitempty"`
KubeBurst int `yaml:"kubeBurst,omitempty"`
LogLevel string `yaml:"logLevel"`
LogFormat string `yaml:"logFormat"`
ThrottlePeriod int64 `yaml:"throttlePeriod"`
MaxEventAgeSeconds int64 `yaml:"maxEventAgeSeconds"`
ClusterName string `yaml:"clusterName,omitempty"`
Namespace string `yaml:"namespace"`
LeaderElection kube.LeaderElectionConfig `yaml:"leaderElection"`
Route Route `yaml:"route"`
Receivers []sinks.ReceiverConfig `yaml:"receivers"`
KubeQPS float32 `yaml:"kubeQPS,omitempty"`
KubeBurst int `yaml:"kubeBurst,omitempty"`
}

func (c *Config) Validate() error {
if err := c.validateDefaults(); err != nil {
return err
}

// No duplicate receivers
// Receivers individually
// Routers recursive
return nil
}

func (c *Config) validateDefaults() error {
if err := c.validateMaxEventAgeSeconds(); err != nil {
return err
}
return nil
}

func (c *Config) validateMaxEventAgeSeconds() error {
if c.ThrottlePeriod == 0 && c.MaxEventAgeSeconds == 0 {
c.MaxEventAgeSeconds = 5
log.Info().Msg("set config.maxEventAgeSeconds=5 (default)")
} else if c.ThrottlePeriod != 0 && c.MaxEventAgeSeconds != 0 {
log.Error().Msg("cannot set both throttlePeriod (depricated) and MaxEventAgeSeconds")
return errors.New("validateMaxEventAgeSeconds failed")
} else if c.ThrottlePeriod != 0 {
log_value := strconv.FormatInt(c.ThrottlePeriod, 10)
log.Info().Msg("config.maxEventAgeSeconds=" + log_value)
log.Warn().Msg("config.throttlePeriod is depricated, consider using config.maxEventAgeSeconds instead")
c.MaxEventAgeSeconds = c.ThrottlePeriod
} else {
log_value := strconv.FormatInt(c.MaxEventAgeSeconds, 10)
log.Info().Msg("config.maxEventAgeSeconds=" + log_value)
}
return nil
}
54 changes: 53 additions & 1 deletion pkg/exporter/config_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package exporter

import (
"bytes"
"testing"

"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v2"
"testing"
)

func readConfig(t *testing.T, yml string) Config {
Expand Down Expand Up @@ -40,3 +43,52 @@ receivers:
assert.Equal(t, "v33", cfg.Route.Routes[0].Drop[0].APIVersion)
assert.Equal(t, "stdout", cfg.Route.Routes[0].Match[0].Receiver)
}


func TestValidate_IsCheckingMaxEventAgeSeconds_WhenNotSet(t *testing.T) {
config := Config{}
err := config.Validate()
assert.True(t, config.MaxEventAgeSeconds == 5)
assert.NoError(t, err)
}

func TestValidate_IsCheckingMaxEventAgeSeconds_WhenThrottledPeriodSet(t *testing.T) {
output := &bytes.Buffer{}
log.Logger = log.Logger.Output(output)

config := Config{
ThrottlePeriod: 123,
}
err := config.Validate()

assert.True(t, config.MaxEventAgeSeconds == 123)
assert.Contains(t, output.String(), "config.maxEventAgeSeconds=123")
assert.Contains(t, output.String(), "config.throttlePeriod is depricated, consider using config.maxEventAgeSeconds instead")
assert.NoError(t, err)
}

func TestValidate_IsCheckingMaxEventAgeSeconds_WhenMaxEventAgeSecondsSet(t *testing.T) {
output := &bytes.Buffer{}
log.Logger = log.Logger.Output(output)

config := Config{
MaxEventAgeSeconds: 123,
}
err := config.Validate()
assert.True(t, config.MaxEventAgeSeconds == 123)
assert.Contains(t, output.String(), "config.maxEventAgeSeconds=123")
assert.NoError(t, err)
}

func TestValidate_IsCheckingMaxEventAgeSeconds_WhenMaxEventAgeSecondsAndThrottledPeriodSet(t *testing.T) {
output := &bytes.Buffer{}
log.Logger = log.Logger.Output(output)

config := Config{
ThrottlePeriod: 123,
MaxEventAgeSeconds: 321,
}
err := config.Validate()
assert.Error(t, err)
assert.Contains(t, output.String(), "cannot set both throttlePeriod (depricated) and MaxEventAgeSeconds")
}
15 changes: 13 additions & 2 deletions pkg/kube/annotations.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package kube

import (
"strings"
"sync"

lru "github.com/hashicorp/golang-lru"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"strings"
"sync"
)

type AnnotationCache struct {
Expand Down Expand Up @@ -59,3 +61,12 @@ func (a *AnnotationCache) GetAnnotationsWithCache(reference *v1.ObjectReference)
return nil, err

}

func NewMockAnnotationCache() *AnnotationCache {
cache, _ := lru.NewARC(1024)
uid := types.UID("test")
cache.Add(uid, map[string]string{"test": "test"})
return &AnnotationCache{
cache: cache,
}
}
15 changes: 13 additions & 2 deletions pkg/kube/labels.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package kube

import (
"sync"

lru "github.com/hashicorp/golang-lru"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sync"
)

type LabelCache struct {
Expand All @@ -18,7 +20,7 @@ type LabelCache struct {
sync.RWMutex
}

func NewLabelCache(kubeconfig *rest.Config) (*LabelCache) {
func NewLabelCache(kubeconfig *rest.Config) *LabelCache {
cache, err := lru.NewARC(1024)
if err != nil {
panic("cannot init cache: " + err.Error())
Expand Down Expand Up @@ -55,3 +57,12 @@ func (l *LabelCache) GetLabelsWithCache(reference *v1.ObjectReference) (map[stri
// An non-ignorable error occurred
return nil, err
}

func NewMockLabelCache() *LabelCache {
cache, _ := lru.NewARC(1024)
uid := types.UID("test")
cache.Add(uid, map[string]string{"test": "test"})
return &LabelCache{
cache: cache,
}
}
Loading