@@ -69,7 +69,7 @@ type controllerManager struct {
6969 // (and EventHandlers, Sources and Predicates).
7070 recorderProvider recorder.Provider
7171
72- // resourceLock
72+ // resourceLock forms the basis for leader election
7373 resourceLock resourcelock.Interface
7474
7575 // mapper is used to map resources to kind, and map kind and version.
@@ -81,10 +81,16 @@ type controllerManager struct {
8181 mu sync.Mutex
8282 started bool
8383 errChan chan error
84- stop <- chan struct {}
8584
86- // stopper is the write side of the stop channel. They should have the same value.
87- stopper chan <- struct {}
85+ // internalStop is the stop channel *actually* used by everything involved
86+ // with the manager as a stop channel, so that we can pass a stop channel
87+ // to things that need it off the bat (like the Channel source). It can
88+ // be closed via `internalStopper` (by being the same underlying channel).
89+ internalStop <- chan struct {}
90+
91+ // internalStopper is the write side of the internal stop channel, allowing us to close it.
92+ // It and `internalStop` should point to the same channel.
93+ internalStopper chan <- struct {}
8894
8995 startCache func (stop <- chan struct {}) error
9096}
@@ -104,7 +110,7 @@ func (cm *controllerManager) Add(r Runnable) error {
104110 if cm .started {
105111 // If already started, start the controller
106112 go func () {
107- cm .errChan <- r .Start (cm .stop )
113+ cm .errChan <- r .Start (cm .internalStop )
108114 }()
109115 }
110116
@@ -127,7 +133,7 @@ func (cm *controllerManager) SetFields(i interface{}) error {
127133 if _ , err := inject .InjectorInto (cm .SetFields , i ); err != nil {
128134 return err
129135 }
130- if _ , err := inject .StopChannelInto (cm .stop , i ); err != nil {
136+ if _ , err := inject .StopChannelInto (cm .internalStop , i ); err != nil {
131137 return err
132138 }
133139 if _ , err := inject .DecoderInto (cm .admissionDecoder , i ); err != nil {
@@ -195,8 +201,8 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
195201}
196202
197203func (cm * controllerManager ) Start (stop <- chan struct {}) error {
198- // join the passed-in stop channel as an upstream feeding into cm.stopper
199- defer close (cm .stopper )
204+ // join the passed-in stop channel as an upstream feeding into cm.internalStopper
205+ defer close (cm .internalStopper )
200206
201207 if cm .resourceLock != nil {
202208 err := cm .startLeaderElection ()
@@ -226,27 +232,27 @@ func (cm *controllerManager) start() {
226232 cm .startCache = cm .cache .Start
227233 }
228234 go func () {
229- if err := cm .startCache (cm .stop ); err != nil {
235+ if err := cm .startCache (cm .internalStop ); err != nil {
230236 cm .errChan <- err
231237 }
232238 }()
233239
234240 // Start the metrics server
235241 if cm .metricsListener != nil {
236- go cm .serveMetrics (cm .stop )
242+ go cm .serveMetrics (cm .internalStop )
237243 }
238244
239245 // Wait for the caches to sync.
240246 // TODO(community): Check the return value and write a test
241- cm .cache .WaitForCacheSync (cm .stop )
247+ cm .cache .WaitForCacheSync (cm .internalStop )
242248
243249 // Start the runnables after the cache has synced
244250 for _ , c := range cm .runnables {
245251 // Controllers block, but we want to return an error if any have an error starting.
246252 // Write any Start errors to a channel so we can return them
247253 ctrl := c
248254 go func () {
249- cm .errChan <- ctrl .Start (cm .stop )
255+ cm .errChan <- ctrl .Start (cm .internalStop )
250256 }()
251257 }
252258
0 commit comments