API-1835: intro dispatcher filters#7552
Conversation
Adds an input resource initializer runnable for OpenShiftManager that discovers input resources and wires informers, and registers it in the controller setup. Notes: discovery is still a stub and we still need to register informer handlers and add filtering.
inputResourceDispatcher is a simple dispatcher that applies GVK scoped filters and forwards matching events. Each GVK has its own set of filters. Today these may include name/namespace checks, and in the future label selectors. Longer term, this dispatcher is expected to track which input resources are associated with which operator.
|
PR needs rebase. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
WalkthroughAdds an input resource dispatcher and initializer to discover, validate, filter, and dispatch input resource events by GVK; includes unit tests for both components, wires the initializer into the openshiftmanager controller setup, and updates module dependencies. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes ✨ Finishing touches
Comment |
|
@p0lyn0mial: This pull request references API-1835 which is a valid jira issue. Warning: The referenced jira issue has an invalid target version for the target branch this PR targets: expected the epic to target the "4.22.0" version, but no target version was set. DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the openshift-eng/jira-lifecycle-plugin repository. |
|
@p0lyn0mial: This pull request references API-1835 which is a valid jira issue. Warning: The referenced jira issue has an invalid target version for the target branch this PR targets: expected the epic to target the "4.22.0" version, but no target version was set. DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the openshift-eng/jira-lifecycle-plugin repository. |
|
/assign @benluddy @bertinatto @csrwng |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In
`@control-plane-operator/controllers/openshiftmanager/input_resource_dispatcher.go`:
- Around line 10-42: The dispatcher currently creates an unbuffered eventsCh
which makes inputResourceDispatcher.Handle block if no receiver is ready; update
newInputResourceDispatcher to create a buffered channel (eventsCh := make(chan
event.GenericEvent, <reasonableSize>)) so Handle can send without blocking
briefly (choose a small constant like 16 or make the buffer size configurable),
keep the rest of inputResourceDispatcher, Handle and filters logic unchanged,
and ensure any shutdown/consumer code can drain the channel if needed.
In
`@control-plane-operator/controllers/openshiftmanager/input_resource_initializer.go`:
- Around line 36-50: The Start method always fails because
discoverInputResources currently returns a "not implemented" error; replace the
stub so discovery is a safe no-op until real discovery is implemented: change
discoverInputResources to return an empty
map[string]*libraryinputresources.InputResources and nil (so inputResources is
empty) or alternatively early-return from Start when discovery is not ready, but
do not return an error. Update references to
inputResourceInitializer.discoverInputResources, Start,
checkSupportedInputResources, and startAndWaitForInformersFor to handle an empty
map appropriately so the operator can start successfully.
| type inputResourceEventFilter func(obj client.Object) bool | ||
|
|
||
| // inputResourceDispatcher is a simple dispatcher that applies GVK scoped filters | ||
| // and forwards matching events. | ||
| // | ||
| // Each GVK has its own set of filters. Today these | ||
| // may include name/namespace checks, and in the future label selectors. | ||
| // | ||
| // Longer term, this dispatcher is expected to track which input resources are | ||
| // associated with which operator. | ||
| type inputResourceDispatcher struct { | ||
| eventsCh chan event.GenericEvent | ||
| filters map[schema.GroupVersionKind][]inputResourceEventFilter | ||
| } | ||
|
|
||
| func newInputResourceDispatcher(filters map[schema.GroupVersionKind][]inputResourceEventFilter) *inputResourceDispatcher { | ||
| return &inputResourceDispatcher{ | ||
| eventsCh: make(chan event.GenericEvent), | ||
| filters: filters, | ||
| } | ||
| } | ||
|
|
||
| func (d *inputResourceDispatcher) Handle(gvk schema.GroupVersionKind, cObj client.Object) { | ||
| filters := d.filters[gvk] | ||
| if len(filters) == 0 { | ||
| d.eventsCh <- event.GenericEvent{Object: cObj} | ||
| return | ||
| } | ||
|
|
||
| for _, filter := range filters { | ||
| if filter(cObj) { | ||
| d.eventsCh <- event.GenericEvent{Object: cObj} | ||
| return |
There was a problem hiding this comment.
Avoid blocking producers on the dispatcher channel.
eventsCh is unbuffered, so Handle blocks until a receiver reads. If callers emit events while no consumer is ready, this can stall producers. Consider buffering (or other non-blocking dispatch) to prevent deadlocks.
🛠️ Possible fix (buffered channel)
type inputResourceEventFilter func(obj client.Object) bool
+const defaultDispatchBuffer = 100
+
// inputResourceDispatcher is a simple dispatcher that applies GVK scoped filters
// and forwards matching events.
//
@@
func newInputResourceDispatcher(filters map[schema.GroupVersionKind][]inputResourceEventFilter) *inputResourceDispatcher {
return &inputResourceDispatcher{
- eventsCh: make(chan event.GenericEvent),
+ eventsCh: make(chan event.GenericEvent, defaultDispatchBuffer),
filters: filters,
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| type inputResourceEventFilter func(obj client.Object) bool | |
| // inputResourceDispatcher is a simple dispatcher that applies GVK scoped filters | |
| // and forwards matching events. | |
| // | |
| // Each GVK has its own set of filters. Today these | |
| // may include name/namespace checks, and in the future label selectors. | |
| // | |
| // Longer term, this dispatcher is expected to track which input resources are | |
| // associated with which operator. | |
| type inputResourceDispatcher struct { | |
| eventsCh chan event.GenericEvent | |
| filters map[schema.GroupVersionKind][]inputResourceEventFilter | |
| } | |
| func newInputResourceDispatcher(filters map[schema.GroupVersionKind][]inputResourceEventFilter) *inputResourceDispatcher { | |
| return &inputResourceDispatcher{ | |
| eventsCh: make(chan event.GenericEvent), | |
| filters: filters, | |
| } | |
| } | |
| func (d *inputResourceDispatcher) Handle(gvk schema.GroupVersionKind, cObj client.Object) { | |
| filters := d.filters[gvk] | |
| if len(filters) == 0 { | |
| d.eventsCh <- event.GenericEvent{Object: cObj} | |
| return | |
| } | |
| for _, filter := range filters { | |
| if filter(cObj) { | |
| d.eventsCh <- event.GenericEvent{Object: cObj} | |
| return | |
| type inputResourceEventFilter func(obj client.Object) bool | |
| const defaultDispatchBuffer = 100 | |
| // inputResourceDispatcher is a simple dispatcher that applies GVK scoped filters | |
| // and forwards matching events. | |
| // | |
| // Each GVK has its own set of filters. Today these | |
| // may include name/namespace checks, and in the future label selectors. | |
| // | |
| // Longer term, this dispatcher is expected to track which input resources are | |
| // associated with which operator. | |
| type inputResourceDispatcher struct { | |
| eventsCh chan event.GenericEvent | |
| filters map[schema.GroupVersionKind][]inputResourceEventFilter | |
| } | |
| func newInputResourceDispatcher(filters map[schema.GroupVersionKind][]inputResourceEventFilter) *inputResourceDispatcher { | |
| return &inputResourceDispatcher{ | |
| eventsCh: make(chan event.GenericEvent, defaultDispatchBuffer), | |
| filters: filters, | |
| } | |
| } | |
| func (d *inputResourceDispatcher) Handle(gvk schema.GroupVersionKind, cObj client.Object) { | |
| filters := d.filters[gvk] | |
| if len(filters) == 0 { | |
| d.eventsCh <- event.GenericEvent{Object: cObj} | |
| return | |
| } | |
| for _, filter := range filters { | |
| if filter(cObj) { | |
| d.eventsCh <- event.GenericEvent{Object: cObj} | |
| return |
🤖 Prompt for AI Agents
In
`@control-plane-operator/controllers/openshiftmanager/input_resource_dispatcher.go`
around lines 10 - 42, The dispatcher currently creates an unbuffered eventsCh
which makes inputResourceDispatcher.Handle block if no receiver is ready; update
newInputResourceDispatcher to create a buffered channel (eventsCh := make(chan
event.GenericEvent, <reasonableSize>)) so Handle can send without blocking
briefly (choose a small constant like 16 or make the buffer size configurable),
keep the rest of inputResourceDispatcher, Handle and filters logic unchanged,
and ensure any shutdown/consumer code can drain the channel if needed.
| func (r *inputResourceInitializer) Start(ctx context.Context) error { | ||
| inputResources, err := r.discoverInputResources() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if err = r.checkSupportedInputResources(inputResources); err != nil { | ||
| return err | ||
| } | ||
| // TODO: register filters for dispatcher based on inputRes | ||
| return r.startAndWaitForInformersFor(ctx, inputResources) | ||
| } | ||
|
|
||
| func (r *inputResourceInitializer) discoverInputResources() (map[string]*libraryinputresources.InputResources, error) { | ||
| return nil, fmt.Errorf("not implemented") | ||
| } |
There was a problem hiding this comment.
Start always fails because discovery is stubbed.
discoverInputResources returns a "not implemented" error, so Start will always error and the manager will fail to start when this controller is enabled. Consider gating until discovery exists or making discovery a temporary no-op.
🛠️ Possible no-op stub to avoid startup failure
func (r *inputResourceInitializer) discoverInputResources() (map[string]*libraryinputresources.InputResources, error) {
- return nil, fmt.Errorf("not implemented")
+ // TODO: replace with real discovery; empty map keeps initializer a no-op for now.
+ return map[string]*libraryinputresources.InputResources{}, nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (r *inputResourceInitializer) Start(ctx context.Context) error { | |
| inputResources, err := r.discoverInputResources() | |
| if err != nil { | |
| return err | |
| } | |
| if err = r.checkSupportedInputResources(inputResources); err != nil { | |
| return err | |
| } | |
| // TODO: register filters for dispatcher based on inputRes | |
| return r.startAndWaitForInformersFor(ctx, inputResources) | |
| } | |
| func (r *inputResourceInitializer) discoverInputResources() (map[string]*libraryinputresources.InputResources, error) { | |
| return nil, fmt.Errorf("not implemented") | |
| } | |
| func (r *inputResourceInitializer) Start(ctx context.Context) error { | |
| inputResources, err := r.discoverInputResources() | |
| if err != nil { | |
| return err | |
| } | |
| if err = r.checkSupportedInputResources(inputResources); err != nil { | |
| return err | |
| } | |
| // TODO: register filters for dispatcher based on inputRes | |
| return r.startAndWaitForInformersFor(ctx, inputResources) | |
| } | |
| func (r *inputResourceInitializer) discoverInputResources() (map[string]*libraryinputresources.InputResources, error) { | |
| // TODO: replace with real discovery; empty map keeps initializer a no-op for now. | |
| return map[string]*libraryinputresources.InputResources{}, nil | |
| } |
🤖 Prompt for AI Agents
In
`@control-plane-operator/controllers/openshiftmanager/input_resource_initializer.go`
around lines 36 - 50, The Start method always fails because
discoverInputResources currently returns a "not implemented" error; replace the
stub so discovery is a safe no-op until real discovery is implemented: change
discoverInputResources to return an empty
map[string]*libraryinputresources.InputResources and nil (so inputResources is
empty) or alternatively early-return from Start when discovery is not ready, but
do not return an error. Update references to
inputResourceInitializer.discoverInputResources, Start,
checkSupportedInputResources, and startAndWaitForInformersFor to handle an empty
map appropriately so the operator can start successfully.
prepares matchers to filter cluster(s) resources based on inputResources
refactors tests so that they use a common function for getting a RESTMapper
c8ccec4 to
efc86c9
Compare
|
@p0lyn0mial: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: p0lyn0mial The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
| } | ||
| if def.Name != "" && obj.GetName() != def.Name { | ||
| return false | ||
| } |
There was a problem hiding this comment.
It seems like we can safely drop the checks for empty strings for both Name and Namespace.
requires #7477 and #7543
prepares matchers to filter cluster(s) resources based on inputResources
please review
feat(openshiftmanager-phase2): intro buildInputResourceFiltersandfeat(openshiftmanager-phase2): remove fakeRESTMapperFor