diff --git a/README.md b/README.md index 58dc39f7..71a85845 100644 --- a/README.md +++ b/README.md @@ -52,15 +52,21 @@ receivers: * A route can have many sub-routes, forming a tree. * Routing starts from the root route. -## Note: - -If you are operating a big cluster with many events, you might want to adjust the following values in configuration to -avoid throttling issues: - -``` -kubeQPS: 60 -kubeBurst: 60 -``` +## Troubleshoot "Events Discarded" warning: + +- If there are `client-side throttling` warnings in the event-exporter log: + Adjust the following values in configuration: + ``` + kubeQPS: 100 + kubeBurst: 500 + ``` + > `Burst` to roughly match your events per minute + > `QPS` to be 1/5 of the burst +- If there is no request throttling, but events are still dropped: + Consider increasing events cut off age + ``` + maxEventAgeSeconds: 60 + ``` ### Opsgenie @@ -271,13 +277,10 @@ Standard out is also another file in Linux. `logLevel` refers to the application `trace`, `debug`, `info`, `warn`, `error`, `fatal` and `panic`. When not specified, default level is set to `info`. You can use the following configuration as an example. -By default, events emit with eventime > 5seconds since catching are not collected. -You can set this period with throttlePeriod in seconds. Consider to increase time of seconds to catch more events like "Backoff". - ```yaml logLevel: error logFormat: json -throttlePeriod: 5 +maxEventAgeSeconds: 5 route: routes: - match: diff --git a/config.example.yaml b/config.example.yaml index f8516b80..e5bce965 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -1,6 +1,6 @@ logLevel: debug logFormat: json -throttlePeriod: 10 +maxEventAgeSeconds: 10 kubeQPS: 60 kubeBurst: 60 # namespace: my-namespace-only # Omitting it defaults to all namespaces. diff --git a/main.go b/main.go index 209064fb..1d072f63 100644 --- a/main.go +++ b/main.go @@ -63,8 +63,8 @@ func main() { log.Fatal().Str("log_format", cfg.LogFormat).Msg("Unknown log format") } - if cfg.ThrottlePeriod == 0 { - cfg.ThrottlePeriod = 5 + if err := cfg.Validate(); err != nil { + log.Fatal().Err(err).Msg("config validation failed") } kubeconfig, err := kube.GetKubernetesConfig() @@ -84,7 +84,7 @@ func main() { engine.OnEvent(event) } } - w := kube.NewEventWatcher(kubeconfig, cfg.Namespace, cfg.ThrottlePeriod, onEvent) + w := kube.NewEventWatcher(kubeconfig, cfg.Namespace, cfg.MaxEventAgeSeconds, onEvent) ctx, cancel := context.WithCancel(context.Background()) leaderLost := make(chan bool) diff --git a/pkg/exporter/config.go b/pkg/exporter/config.go index 09614e19..e75f4c49 100644 --- a/pkg/exporter/config.go +++ b/pkg/exporter/config.go @@ -1,8 +1,12 @@ package exporter import ( + "errors" + "strconv" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/resmoio/kubernetes-event-exporter/pkg/sinks" + "github.com/rs/zerolog/log" ) // Config allows configuration @@ -10,21 +14,52 @@ type Config struct { // Route is the top route that the events will match // TODO: There is currently a tight coupling with route and config, but not with receiver config and sink so // TODO: I am not sure what to do here. - LogLevel string `yaml:"logLevel"` - LogFormat string `yaml:"logFormat"` - ThrottlePeriod int64 `yaml:"throttlePeriod"` - ClusterName string `yaml:"clusterName,omitempty"` - Namespace string `yaml:"namespace"` - LeaderElection kube.LeaderElectionConfig `yaml:"leaderElection"` - Route Route `yaml:"route"` - Receivers []sinks.ReceiverConfig `yaml:"receivers"` - KubeQPS float32 `yaml:"kubeQPS,omitempty"` - KubeBurst int `yaml:"kubeBurst,omitempty"` + LogLevel string `yaml:"logLevel"` + LogFormat string `yaml:"logFormat"` + ThrottlePeriod int64 `yaml:"throttlePeriod"` + MaxEventAgeSeconds int64 `yaml:"maxEventAgeSeconds"` + ClusterName string `yaml:"clusterName,omitempty"` + Namespace string `yaml:"namespace"` + LeaderElection kube.LeaderElectionConfig `yaml:"leaderElection"` + Route Route `yaml:"route"` + Receivers []sinks.ReceiverConfig `yaml:"receivers"` + KubeQPS float32 `yaml:"kubeQPS,omitempty"` + KubeBurst int `yaml:"kubeBurst,omitempty"` } func (c *Config) Validate() error { + if err := c.validateDefaults(); err != nil { + return err + } + // No duplicate receivers // Receivers individually // Routers recursive return nil } + +func (c *Config) validateDefaults() error { + if err := c.validateMaxEventAgeSeconds(); err != nil { + return err + } + return nil +} + +func (c *Config) validateMaxEventAgeSeconds() error { + if c.ThrottlePeriod == 0 && c.MaxEventAgeSeconds == 0 { + c.MaxEventAgeSeconds = 5 + log.Info().Msg("set config.maxEventAgeSeconds=5 (default)") + } else if c.ThrottlePeriod != 0 && c.MaxEventAgeSeconds != 0 { + log.Error().Msg("cannot set both throttlePeriod (depricated) and MaxEventAgeSeconds") + return errors.New("validateMaxEventAgeSeconds failed") + } else if c.ThrottlePeriod != 0 { + log_value := strconv.FormatInt(c.ThrottlePeriod, 10) + log.Info().Msg("config.maxEventAgeSeconds=" + log_value) + log.Warn().Msg("config.throttlePeriod is depricated, consider using config.maxEventAgeSeconds instead") + c.MaxEventAgeSeconds = c.ThrottlePeriod + } else { + log_value := strconv.FormatInt(c.MaxEventAgeSeconds, 10) + log.Info().Msg("config.maxEventAgeSeconds=" + log_value) + } + return nil +} diff --git a/pkg/exporter/config_test.go b/pkg/exporter/config_test.go index 6396278f..0e5e8a17 100644 --- a/pkg/exporter/config_test.go +++ b/pkg/exporter/config_test.go @@ -1,9 +1,12 @@ package exporter import ( + "bytes" + "testing" + + "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v2" - "testing" ) func readConfig(t *testing.T, yml string) Config { @@ -40,3 +43,52 @@ receivers: assert.Equal(t, "v33", cfg.Route.Routes[0].Drop[0].APIVersion) assert.Equal(t, "stdout", cfg.Route.Routes[0].Match[0].Receiver) } + + +func TestValidate_IsCheckingMaxEventAgeSeconds_WhenNotSet(t *testing.T) { + config := Config{} + err := config.Validate() + assert.True(t, config.MaxEventAgeSeconds == 5) + assert.NoError(t, err) +} + +func TestValidate_IsCheckingMaxEventAgeSeconds_WhenThrottledPeriodSet(t *testing.T) { + output := &bytes.Buffer{} + log.Logger = log.Logger.Output(output) + + config := Config{ + ThrottlePeriod: 123, + } + err := config.Validate() + + assert.True(t, config.MaxEventAgeSeconds == 123) + assert.Contains(t, output.String(), "config.maxEventAgeSeconds=123") + assert.Contains(t, output.String(), "config.throttlePeriod is depricated, consider using config.maxEventAgeSeconds instead") + assert.NoError(t, err) +} + +func TestValidate_IsCheckingMaxEventAgeSeconds_WhenMaxEventAgeSecondsSet(t *testing.T) { + output := &bytes.Buffer{} + log.Logger = log.Logger.Output(output) + + config := Config{ + MaxEventAgeSeconds: 123, + } + err := config.Validate() + assert.True(t, config.MaxEventAgeSeconds == 123) + assert.Contains(t, output.String(), "config.maxEventAgeSeconds=123") + assert.NoError(t, err) +} + +func TestValidate_IsCheckingMaxEventAgeSeconds_WhenMaxEventAgeSecondsAndThrottledPeriodSet(t *testing.T) { + output := &bytes.Buffer{} + log.Logger = log.Logger.Output(output) + + config := Config{ + ThrottlePeriod: 123, + MaxEventAgeSeconds: 321, + } + err := config.Validate() + assert.Error(t, err) + assert.Contains(t, output.String(), "cannot set both throttlePeriod (depricated) and MaxEventAgeSeconds") +} \ No newline at end of file diff --git a/pkg/kube/annotations.go b/pkg/kube/annotations.go index 4f0470ad..ae47a434 100644 --- a/pkg/kube/annotations.go +++ b/pkg/kube/annotations.go @@ -1,14 +1,16 @@ package kube import ( + "strings" + "sync" + lru "github.com/hashicorp/golang-lru" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "strings" - "sync" ) type AnnotationCache struct { @@ -59,3 +61,12 @@ func (a *AnnotationCache) GetAnnotationsWithCache(reference *v1.ObjectReference) return nil, err } + +func NewMockAnnotationCache() *AnnotationCache { + cache, _ := lru.NewARC(1024) + uid := types.UID("test") + cache.Add(uid, map[string]string{"test": "test"}) + return &AnnotationCache{ + cache: cache, + } +} diff --git a/pkg/kube/labels.go b/pkg/kube/labels.go index 76e182e0..e7eaf8e3 100644 --- a/pkg/kube/labels.go +++ b/pkg/kube/labels.go @@ -1,13 +1,15 @@ package kube import ( + "sync" + lru "github.com/hashicorp/golang-lru" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "sync" ) type LabelCache struct { @@ -18,7 +20,7 @@ type LabelCache struct { sync.RWMutex } -func NewLabelCache(kubeconfig *rest.Config) (*LabelCache) { +func NewLabelCache(kubeconfig *rest.Config) *LabelCache { cache, err := lru.NewARC(1024) if err != nil { panic("cannot init cache: " + err.Error()) @@ -55,3 +57,12 @@ func (l *LabelCache) GetLabelsWithCache(reference *v1.ObjectReference) (map[stri // An non-ignorable error occurred return nil, err } + +func NewMockLabelCache() *LabelCache { + cache, _ := lru.NewARC(1024) + uid := types.UID("test") + cache.Add(uid, map[string]string{"test": "test"}) + return &LabelCache{ + cache: cache, + } +} diff --git a/pkg/kube/watcher.go b/pkg/kube/watcher.go index c7f490d2..5ad1e762 100644 --- a/pkg/kube/watcher.go +++ b/pkg/kube/watcher.go @@ -18,35 +18,41 @@ var ( Name: "events_sent", Help: "The total number of events sent", }) + eventsDiscarded = promauto.NewCounter(prometheus.CounterOpts{ + Name: "events_discarded", + Help: "The total number of events discarded because of being older than the maxEventAgeSeconds specified", + }) watchErrors = promauto.NewCounter(prometheus.CounterOpts{ Name: "watch_errors", Help: "The total number of errors received from the informer", }) + + startUpTime = time.Now() ) type EventHandler func(event *EnhancedEvent) type EventWatcher struct { - informer cache.SharedInformer - stopper chan struct{} - labelCache *LabelCache - annotationCache *AnnotationCache - fn EventHandler - throttlePeriod time.Duration + informer cache.SharedInformer + stopper chan struct{} + labelCache *LabelCache + annotationCache *AnnotationCache + fn EventHandler + maxEventAgeSeconds time.Duration } -func NewEventWatcher(config *rest.Config, namespace string, throttlePeriod int64, fn EventHandler) *EventWatcher { +func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds int64, fn EventHandler) *EventWatcher { clientset := kubernetes.NewForConfigOrDie(config) factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace)) informer := factory.Core().V1().Events().Informer() watcher := &EventWatcher{ - informer: informer, - stopper: make(chan struct{}), - labelCache: NewLabelCache(config), - annotationCache: NewAnnotationCache(config), - fn: fn, - throttlePeriod: time.Second * time.Duration(throttlePeriod), + informer: informer, + stopper: make(chan struct{}), + labelCache: NewLabelCache(config), + annotationCache: NewAnnotationCache(config), + fn: fn, + maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds), } informer.AddEventHandler(watcher) @@ -63,18 +69,34 @@ func (e *EventWatcher) OnAdd(obj interface{}) { } func (e *EventWatcher) OnUpdate(oldObj, newObj interface{}) { - event := newObj.(*corev1.Event) - e.onEvent(event) + // Ignore updates } -func (e *EventWatcher) onEvent(event *corev1.Event) { - // TODO: Re-enable this after development - // It's probably an old event we are catching, it's not the best way but anyways +// Ignore events older than the maxEventAgeSeconds +func (e *EventWatcher) isEventDiscarded(event *corev1.Event) bool { timestamp := event.LastTimestamp.Time if timestamp.IsZero() { timestamp = event.EventTime.Time } - if time.Since(timestamp) > e.throttlePeriod { + eventAge := time.Since(timestamp) + if eventAge > e.maxEventAgeSeconds { + // Log discarded events if they were created after the watcher started + // (to suppres warnings from initial synchrnization) + if timestamp.After(startUpTime) { + log.Warn(). + Str("event age", eventAge.String()). + Str("event namespace", event.Namespace). + Str("event name", event.Name). + Msg("Event discarded as being older then maxEventAgeSeconds") + eventsDiscarded.Inc() + } + return true + } + return false +} + +func (e *EventWatcher) onEvent(event *corev1.Event) { + if e.isEventDiscarded(event) { return } @@ -118,7 +140,6 @@ func (e *EventWatcher) onEvent(event *corev1.Event) { } e.fn(ev) - return } func (e *EventWatcher) OnDelete(obj interface{}) { @@ -133,3 +154,17 @@ func (e *EventWatcher) Stop() { e.stopper <- struct{}{} close(e.stopper) } + +func NewMockEventWatcher(MaxEventAgeSeconds int64) *EventWatcher { + watcher := &EventWatcher{ + labelCache: NewMockLabelCache(), + annotationCache: NewMockAnnotationCache(), + maxEventAgeSeconds: time.Second * time.Duration(MaxEventAgeSeconds), + fn: func(event *EnhancedEvent) {}, + } + return watcher +} + +func (e *EventWatcher) SetStartUpTime(time time.Time) { + startUpTime = time +} diff --git a/pkg/kube/watcher_test.go b/pkg/kube/watcher_test.go new file mode 100644 index 00000000..28b6359f --- /dev/null +++ b/pkg/kube/watcher_test.go @@ -0,0 +1,155 @@ +package kube + +import ( + "bytes" + "testing" + "time" + + "github.com/rs/zerolog/log" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestEventWatcher_EventAge_whenEventCreatedBeforeStartup(t *testing.T) { + // should not discard events as old as 300s=5m + var MaxEventAgeSeconds int64 = 300 + ew := NewMockEventWatcher(MaxEventAgeSeconds) + output := &bytes.Buffer{} + log.Logger = log.Logger.Output(output) + + // event is 3m before stratup time -> expect silently dropped + startup := time.Now().Add(-10 * time.Minute) + ew.SetStartUpTime(startup) + event1 := corev1.Event{ + LastTimestamp: metav1.Time{Time: startup.Add(-3 * time.Minute)}, + } + + // event is 3m before stratup time -> expect silently dropped + assert.True(t, ew.isEventDiscarded(&event1)) + assert.NotContains(t, output.String(), "Event discarded as being older then maxEventAgeSeconds") + ew.onEvent(&event1) + assert.NotContains(t, output.String(), "Received event") + + event2 := corev1.Event{ + EventTime: metav1.MicroTime{Time: startup.Add(-3 * time.Minute)}, + } + + assert.True(t, ew.isEventDiscarded(&event2)) + assert.NotContains(t, output.String(), "Event discarded as being older then maxEventAgeSeconds") + ew.onEvent(&event2) + assert.NotContains(t, output.String(), "Received event") + + // event is 3m before stratup time -> expect silently dropped + event3 := corev1.Event{ + LastTimestamp: metav1.Time{Time: startup.Add(-3 * time.Minute)}, + EventTime: metav1.MicroTime{Time: startup.Add(-3 * time.Minute)}, + } + + assert.True(t, ew.isEventDiscarded(&event3)) + assert.NotContains(t, output.String(), "Event discarded as being older then maxEventAgeSeconds") + ew.onEvent(&event3) + assert.NotContains(t, output.String(), "Received event") +} + +func TestEventWatcher_EventAge_whenEventCreatedAfterStartupAndBeforeMaxAge(t *testing.T) { + // should not discard events as old as 300s=5m + var MaxEventAgeSeconds int64 = 300 + ew := NewMockEventWatcher(MaxEventAgeSeconds) + output := &bytes.Buffer{} + log.Logger = log.Logger.Output(output) + + startup := time.Now().Add(-10 * time.Minute) + ew.SetStartUpTime(startup) + // event is 8m after stratup time (2m in max age) -> expect processed + event1 := corev1.Event{ + InvolvedObject: corev1.ObjectReference{ + UID: "test", + Name: "test-1", + }, + LastTimestamp: metav1.Time{Time: startup.Add(8 * time.Minute)}, + } + + assert.False(t, ew.isEventDiscarded(&event1)) + assert.NotContains(t, output.String(), "Event discarded as being older then maxEventAgeSeconds") + ew.onEvent(&event1) + assert.Contains(t, output.String(), "test-1") + assert.Contains(t, output.String(), "Received event") + + // event is 8m after stratup time (2m in max age) -> expect processed + event2 := corev1.Event{ + InvolvedObject: corev1.ObjectReference{ + UID: "test", + Name: "test-2", + }, + EventTime: metav1.MicroTime{Time: startup.Add(8 * time.Minute)}, + } + + assert.False(t, ew.isEventDiscarded(&event2)) + assert.NotContains(t, output.String(), "Event discarded as being older then maxEventAgeSeconds") + ew.onEvent(&event2) + assert.Contains(t, output.String(), "test-2") + assert.Contains(t, output.String(), "Received event") + + // event is 8m after stratup time (2m in max age) -> expect processed + event3 := corev1.Event{ + InvolvedObject: corev1.ObjectReference{ + UID: "test", + Name: "test-3", + }, + LastTimestamp: metav1.Time{Time: startup.Add(8 * time.Minute)}, + EventTime: metav1.MicroTime{Time: startup.Add(8 * time.Minute)}, + } + + assert.False(t, ew.isEventDiscarded(&event3)) + assert.NotContains(t, output.String(), "Event discarded as being older then maxEventAgeSeconds") + ew.onEvent(&event3) + assert.Contains(t, output.String(), "test-3") + assert.Contains(t, output.String(), "Received event") +} + +func TestEventWatcher_EventAge_whenEventCreatedAfterStartupAndAfterMaxAge(t *testing.T) { + // should not discard events as old as 300s=5m + var MaxEventAgeSeconds int64 = 300 + ew := NewMockEventWatcher(MaxEventAgeSeconds) + output := &bytes.Buffer{} + log.Logger = log.Logger.Output(output) + + // event is 3m after stratup time (and 2m after max age) -> expect dropped with warn + startup := time.Now().Add(-10 * time.Minute) + ew.SetStartUpTime(startup) + event1 := corev1.Event{ + ObjectMeta: metav1.ObjectMeta{Name: "event1"}, + LastTimestamp: metav1.Time{Time: startup.Add(3 * time.Minute)}, + } + assert.True(t, ew.isEventDiscarded(&event1)) + assert.Contains(t, output.String(), "event1") + assert.Contains(t, output.String(), "Event discarded as being older then maxEventAgeSeconds") + ew.onEvent(&event1) + assert.NotContains(t, output.String(), "Received event") + + // event is 3m after stratup time (and 2m after max age) -> expect dropped with warn + event2 := corev1.Event{ + ObjectMeta: metav1.ObjectMeta{Name: "event2"}, + EventTime: metav1.MicroTime{Time: startup.Add(3 * time.Minute)}, + } + + assert.True(t, ew.isEventDiscarded(&event2)) + assert.Contains(t, output.String(), "event2") + assert.Contains(t, output.String(), "Event discarded as being older then maxEventAgeSeconds") + ew.onEvent(&event2) + assert.NotContains(t, output.String(), "Received event") + + // event is 3m after stratup time (and 2m after max age) -> expect dropped with warn + event3 := corev1.Event{ + ObjectMeta: metav1.ObjectMeta{Name: "event3"}, + LastTimestamp: metav1.Time{Time: startup.Add(3 * time.Minute)}, + EventTime: metav1.MicroTime{Time: startup.Add(3 * time.Minute)}, + } + + assert.True(t, ew.isEventDiscarded(&event3)) + assert.Contains(t, output.String(), "event3") + assert.Contains(t, output.String(), "Event discarded as being older then maxEventAgeSeconds") + ew.onEvent(&event3) + assert.NotContains(t, output.String(), "Received event") +}