@@ -114,7 +114,11 @@ type controllerManager struct {
114114 started bool
115115 startedLeader bool
116116 healthzStarted bool
117- errChan chan error
117+
118+ // NB(directxman12): we don't just use an error channel here to avoid the situation where the
119+ // error channel is too small and we end up blocking some goroutines waiting to report their errors.
120+ // errSignal lets us track when we should stop because an error occurred
121+ errSignal * errSignaler
118122
119123 // internalStop is the stop channel *actually* used by everything involved
120124 // with the manager as a stop channel, so that we can pass a stop channel
@@ -150,6 +154,51 @@ type controllerManager struct {
150154 retryPeriod time.Duration
151155}
152156
157+ type errSignaler struct {
158+ // errSignal indicates that an error occurred, when closed. It shouldn't
159+ // be written to.
160+ errSignal chan struct {}
161+
162+ // err is the received error
163+ err error
164+
165+ mu sync.Mutex
166+ }
167+
168+ func (r * errSignaler ) SignalError (err error ) {
169+ r .mu .Lock ()
170+ defer r .mu .Unlock ()
171+
172+ if err == nil {
173+ // non-error, ignore
174+ log .Error (nil , "SignalError called without an (with a nil) error, which should never happen, ignoring" )
175+ return
176+ }
177+
178+ if r .err != nil {
179+ // we already have an error, don't try again
180+ return
181+ }
182+
183+ // save the error and report it
184+ r .err = err
185+ close (r .errSignal )
186+ }
187+
188+ func (r * errSignaler ) Error () error {
189+ r .mu .Lock ()
190+ defer r .mu .Unlock ()
191+
192+ return r .err
193+ }
194+
195+ func (r * errSignaler ) GotError () chan struct {} {
196+ r .mu .Lock ()
197+ defer r .mu .Unlock ()
198+
199+ return r .errSignal
200+ }
201+
153202// Add sets dependencies on i, and adds it to the list of Runnables to start.
154203func (cm * controllerManager ) Add (r Runnable ) error {
155204 cm .mu .Lock ()
@@ -174,7 +223,9 @@ func (cm *controllerManager) Add(r Runnable) error {
174223 if shouldStart {
175224 // If already started, start the controller
176225 go func () {
177- cm .errChan <- r .Start (cm .internalStop )
226+ if err := r .Start (cm .internalStop ); err != nil {
227+ cm .errSignal .SignalError (err )
228+ }
178229 }()
179230 }
180231
@@ -304,15 +355,15 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
304355 go func () {
305356 log .Info ("starting metrics server" , "path" , metricsPath )
306357 if err := server .Serve (cm .metricsListener ); err != nil && err != http .ErrServerClosed {
307- cm .errChan <- err
358+ cm .errSignal . SignalError ( err )
308359 }
309360 }()
310361
311362 // Shutdown the server when stop is closed
312363 select {
313364 case <- stop :
314365 if err := server .Shutdown (context .Background ()); err != nil {
315- cm .errChan <- err
366+ cm .errSignal . SignalError ( err )
316367 }
317368 }
318369}
@@ -334,7 +385,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
334385 // Run server
335386 go func () {
336387 if err := server .Serve (cm .healthProbeListener ); err != nil && err != http .ErrServerClosed {
337- cm .errChan <- err
388+ cm .errSignal . SignalError ( err )
338389 }
339390 }()
340391 cm .healthzStarted = true
@@ -344,7 +395,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
344395 select {
345396 case <- stop :
346397 if err := server .Shutdown (context .Background ()); err != nil {
347- cm .errChan <- err
398+ cm .errSignal . SignalError ( err )
348399 }
349400 }
350401}
@@ -353,6 +404,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
353404 // join the passed-in stop channel as an upstream feeding into cm.internalStopper
354405 defer close (cm .internalStopper )
355406
407+ // initialize this here so that we reset the signal channel state on every start
408+ cm .errSignal = & errSignaler {errSignal : make (chan struct {})}
409+
356410 // Metrics should be served whether the controller is leader or not.
357411 // (If we don't serve metrics for non-leaders, prometheus will still scrape
358412 // the pod but will get a connection refused)
@@ -380,9 +434,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
380434 case <- stop :
381435 // We are done
382436 return nil
383- case err := <- cm .errChan :
437+ case <- cm .errSignal . GotError () :
384438 // Error starting a controller
385- return err
439+ return cm . errSignal . Error ()
386440 }
387441}
388442
@@ -398,7 +452,9 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
398452 // Write any Start errors to a channel so we can return them
399453 ctrl := c
400454 go func () {
401- cm .errChan <- ctrl .Start (cm .internalStop )
455+ if err := ctrl .Start (cm .internalStop ); err != nil {
456+ cm .errSignal .SignalError (err )
457+ }
402458 }()
403459 }
404460}
@@ -415,7 +471,9 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
415471 // Write any Start errors to a channel so we can return them
416472 ctrl := c
417473 go func () {
418- cm .errChan <- ctrl .Start (cm .internalStop )
474+ if err := ctrl .Start (cm .internalStop ); err != nil {
475+ cm .errSignal .SignalError (err )
476+ }
419477 }()
420478 }
421479
@@ -433,7 +491,7 @@ func (cm *controllerManager) waitForCache() {
433491 }
434492 go func () {
435493 if err := cm .startCache (cm .internalStop ); err != nil {
436- cm .errChan <- err
494+ cm .errSignal . SignalError ( err )
437495 }
438496 }()
439497
@@ -457,7 +515,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
457515 // Most implementations of leader election log.Fatal() here.
458516 // Since Start is wrapped in log.Fatal when called, we can just return
459517 // an error here which will cause the program to exit.
460- cm .errChan <- fmt .Errorf ("leader election lost" )
518+ cm .errSignal . SignalError ( fmt .Errorf ("leader election lost" ) )
461519 },
462520 },
463521 })
0 commit comments