@@ -17,7 +17,9 @@ limitations under the License.
1717package recorder
1818
1919import (
20+ "context"
2021 "fmt"
22+ "sync"
2123
2224 "github.com/go-logr/logr"
2325 corev1 "k8s.io/api/core/v1"
@@ -26,35 +28,129 @@ import (
2628 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
2729 "k8s.io/client-go/rest"
2830 "k8s.io/client-go/tools/record"
29- "sigs.k8s.io/controller-runtime/pkg/recorder"
3031)
3132
32- type provider struct {
33+ // EventBroadcasterProducer makes an event broadcaster, returning
34+ // whether or not the broadcaster should be stopped with the Provider,
35+ // or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
36+ type EventBroadcasterProducer func () (caster record.EventBroadcaster , stopWithProvider bool )
37+
38+ // Provider is a recorder.Provider that records events to the k8s API server
39+ // and to a logr Logger.
40+ type Provider struct {
3341 // scheme to specify when creating a recorder
3442 scheme * runtime.Scheme
35- // eventBroadcaster to create new recorder instance
36- eventBroadcaster record.EventBroadcaster
3743 // logger is the logger to use when logging diagnostic event info
38- logger logr.Logger
44+ logger logr.Logger
45+ evtClient typedcorev1.EventInterface
46+ makeBroadcaster EventBroadcasterProducer
47+
48+ broadcasterOnce sync.Once
49+ broadcaster record.EventBroadcaster
50+ stopBroadcaster bool
51+ }
52+
53+ // NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
54+ // stop it *after* everything else shuts down, otherwise we'll cause panics as the leader election
55+ // code finishes up and tries to continue emitting events.
56+
57+ // Stop attempts to stop this provider, stopping the underlying broadcaster
58+ // if the broadcaster asked to be stopped. It kinda tries to honor the given
59+ // context, but the underlying broadcaster has an indefinite wait that doesn't
60+ // return until all queued events are flushed, so this may end up just returning
61+ // before the underlying wait has finished instead of cancelling the wait.
62+ // This is Very Frustrating™.
63+ func (p * Provider ) Stop (shutdownCtx context.Context ) {
64+ doneCh := make (chan struct {})
65+
66+ go func () {
67+ // technically, this could start the broadcaster, but practically, it's
68+ // almost certainly already been started (e.g. by leader election). We
69+ // need to invoke this to ensure that we don't inadvertently race with
70+ // an invocation of getBroadcaster.
71+ broadcaster := p .getBroadcaster ()
72+ if p .stopBroadcaster {
73+ broadcaster .Shutdown ()
74+ }
75+ close (doneCh )
76+ }()
77+
78+ select {
79+ case <- shutdownCtx .Done ():
80+ case <- doneCh :
81+ }
82+ }
83+
84+ // getBroadcaster ensures that a broadcaster is started for this
85+ // provider, and returns it. It's threadsafe.
86+ func (p * Provider ) getBroadcaster () record.EventBroadcaster {
87+ // NB(directxman12): this can technically still leak if something calls
88+ // "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
89+ // create the broadcaster in start, we could race with other things that
90+ // are started at the same time & want to emit events. The alternative is
91+ // silently swallowing events and more locking, but that seems suboptimal.
92+
93+ p .broadcasterOnce .Do (func () {
94+ broadcaster , stop := p .makeBroadcaster ()
95+ broadcaster .StartRecordingToSink (& typedcorev1.EventSinkImpl {Interface : p .evtClient })
96+ broadcaster .StartEventWatcher (
97+ func (e * corev1.Event ) {
98+ p .logger .V (1 ).Info (e .Type , "object" , e .InvolvedObject , "reason" , e .Reason , "message" , e .Message )
99+ })
100+ p .broadcaster = broadcaster
101+ p .stopBroadcaster = stop
102+ })
103+
104+ return p .broadcaster
39105}
40106
41107// NewProvider create a new Provider instance.
42- func NewProvider (config * rest.Config , scheme * runtime.Scheme , logger logr.Logger , broadcaster record. EventBroadcaster ) (recorder. Provider , error ) {
108+ func NewProvider (config * rest.Config , scheme * runtime.Scheme , logger logr.Logger , makeBroadcaster EventBroadcasterProducer ) (* Provider , error ) {
43109 clientSet , err := kubernetes .NewForConfig (config )
44110 if err != nil {
45111 return nil , fmt .Errorf ("failed to init clientSet: %w" , err )
46112 }
47113
48- p := & provider {scheme : scheme , logger : logger , eventBroadcaster : broadcaster }
49- p .eventBroadcaster .StartRecordingToSink (& typedcorev1.EventSinkImpl {Interface : clientSet .CoreV1 ().Events ("" )})
50- p .eventBroadcaster .StartEventWatcher (
51- func (e * corev1.Event ) {
52- p .logger .V (1 ).Info (e .Type , "object" , e .InvolvedObject , "reason" , e .Reason , "message" , e .Message )
53- })
54-
114+ p := & Provider {scheme : scheme , logger : logger , makeBroadcaster : makeBroadcaster , evtClient : clientSet .CoreV1 ().Events ("" )}
55115 return p , nil
56116}
57117
58- func (p * provider ) GetEventRecorderFor (name string ) record.EventRecorder {
59- return p .eventBroadcaster .NewRecorder (p .scheme , corev1.EventSource {Component : name })
118+ // GetEventRecorderFor returns an event recorder that broadcasts to this provider's
119+ // broadcaster. All events will be associated with a component of the given name.
120+ func (p * Provider ) GetEventRecorderFor (name string ) record.EventRecorder {
121+ return & lazyRecorder {
122+ prov : p ,
123+ name : name ,
124+ }
125+ }
126+
127+ // lazyRecorder is a recorder that doesn't actually instantiate any underlying
128+ // recorder until the first event is emitted.
129+ type lazyRecorder struct {
130+ prov * Provider
131+ name string
132+
133+ recOnce sync.Once
134+ rec record.EventRecorder
135+ }
136+
137+ // ensureRecording ensures that a concrete recorder is populated for this recorder.
138+ func (l * lazyRecorder ) ensureRecording () {
139+ l .recOnce .Do (func () {
140+ broadcaster := l .prov .getBroadcaster ()
141+ l .rec = broadcaster .NewRecorder (l .prov .scheme , corev1.EventSource {Component : l .name })
142+ })
143+ }
144+
145+ func (l * lazyRecorder ) Event (object runtime.Object , eventtype , reason , message string ) {
146+ l .ensureRecording ()
147+ l .rec .Event (object , eventtype , reason , message )
148+ }
149+ func (l * lazyRecorder ) Eventf (object runtime.Object , eventtype , reason , messageFmt string , args ... interface {}) {
150+ l .ensureRecording ()
151+ l .rec .Eventf (object , eventtype , reason , messageFmt , args ... )
152+ }
153+ func (l * lazyRecorder ) AnnotatedEventf (object runtime.Object , annotations map [string ]string , eventtype , reason , messageFmt string , args ... interface {}) {
154+ l .ensureRecording ()
155+ l .rec .AnnotatedEventf (object , annotations , eventtype , reason , messageFmt , args ... )
60156}
0 commit comments