Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package openshiftmanager

import (
"k8s.io/apimachinery/pkg/runtime/schema"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
)

type inputResourceEventFilter func(obj client.Object) bool

// inputResourceDispatcher is a simple dispatcher that applies GVK scoped filters
// and forwards matching events.
//
// Each GVK has its own set of filters. Today these
// may include name/namespace checks, and in the future label selectors.
//
// Longer term, this dispatcher is expected to track which input resources are
// associated with which operator.
type inputResourceDispatcher struct {
eventsCh chan event.GenericEvent
filters map[schema.GroupVersionKind][]inputResourceEventFilter
}

func newInputResourceDispatcher(filters map[schema.GroupVersionKind][]inputResourceEventFilter) *inputResourceDispatcher {
return &inputResourceDispatcher{
eventsCh: make(chan event.GenericEvent),
filters: filters,
}
}

func (d *inputResourceDispatcher) Handle(gvk schema.GroupVersionKind, cObj client.Object) {
filters := d.filters[gvk]
if len(filters) == 0 {
d.eventsCh <- event.GenericEvent{Object: cObj}
return
}

for _, filter := range filters {
if filter(cObj) {
d.eventsCh <- event.GenericEvent{Object: cObj}
return
Comment on lines +10 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid blocking producers on the dispatcher channel.

eventsCh is unbuffered, so Handle blocks until a receiver reads. If callers emit events while no consumer is ready, this can stall producers. Consider buffering (or other non-blocking dispatch) to prevent deadlocks.

🛠️ Possible fix (buffered channel)
 type inputResourceEventFilter func(obj client.Object) bool
 
+const defaultDispatchBuffer = 100
+
 // inputResourceDispatcher is a simple dispatcher that applies GVK scoped filters
 // and forwards matching events.
 //
@@
 func newInputResourceDispatcher(filters map[schema.GroupVersionKind][]inputResourceEventFilter) *inputResourceDispatcher {
 	return &inputResourceDispatcher{
-		eventsCh: make(chan event.GenericEvent),
+		eventsCh: make(chan event.GenericEvent, defaultDispatchBuffer),
 		filters:  filters,
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type inputResourceEventFilter func(obj client.Object) bool
// inputResourceDispatcher is a simple dispatcher that applies GVK scoped filters
// and forwards matching events.
//
// Each GVK has its own set of filters. Today these
// may include name/namespace checks, and in the future label selectors.
//
// Longer term, this dispatcher is expected to track which input resources are
// associated with which operator.
type inputResourceDispatcher struct {
eventsCh chan event.GenericEvent
filters map[schema.GroupVersionKind][]inputResourceEventFilter
}
func newInputResourceDispatcher(filters map[schema.GroupVersionKind][]inputResourceEventFilter) *inputResourceDispatcher {
return &inputResourceDispatcher{
eventsCh: make(chan event.GenericEvent),
filters: filters,
}
}
func (d *inputResourceDispatcher) Handle(gvk schema.GroupVersionKind, cObj client.Object) {
filters := d.filters[gvk]
if len(filters) == 0 {
d.eventsCh <- event.GenericEvent{Object: cObj}
return
}
for _, filter := range filters {
if filter(cObj) {
d.eventsCh <- event.GenericEvent{Object: cObj}
return
type inputResourceEventFilter func(obj client.Object) bool
const defaultDispatchBuffer = 100
// inputResourceDispatcher is a simple dispatcher that applies GVK scoped filters
// and forwards matching events.
//
// Each GVK has its own set of filters. Today these
// may include name/namespace checks, and in the future label selectors.
//
// Longer term, this dispatcher is expected to track which input resources are
// associated with which operator.
type inputResourceDispatcher struct {
eventsCh chan event.GenericEvent
filters map[schema.GroupVersionKind][]inputResourceEventFilter
}
func newInputResourceDispatcher(filters map[schema.GroupVersionKind][]inputResourceEventFilter) *inputResourceDispatcher {
return &inputResourceDispatcher{
eventsCh: make(chan event.GenericEvent, defaultDispatchBuffer),
filters: filters,
}
}
func (d *inputResourceDispatcher) Handle(gvk schema.GroupVersionKind, cObj client.Object) {
filters := d.filters[gvk]
if len(filters) == 0 {
d.eventsCh <- event.GenericEvent{Object: cObj}
return
}
for _, filter := range filters {
if filter(cObj) {
d.eventsCh <- event.GenericEvent{Object: cObj}
return
🤖 Prompt for AI Agents
In
`@control-plane-operator/controllers/openshiftmanager/input_resource_dispatcher.go`
around lines 10 - 42, The dispatcher currently creates an unbuffered eventsCh
which makes inputResourceDispatcher.Handle block if no receiver is ready; update
newInputResourceDispatcher to create a buffered channel (eventsCh := make(chan
event.GenericEvent, <reasonableSize>)) so Handle can send without blocking
briefly (choose a small constant like 16 or make the buffer size configurable),
keep the rest of inputResourceDispatcher, Handle and filters logic unchanged,
and ensure any shutdown/consumer code can drain the channel if needed.

}
}
}

func (d *inputResourceDispatcher) ResultChan() <-chan event.GenericEvent {
return d.eventsCh
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package openshiftmanager

import (
"testing"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"

"github.com/stretchr/testify/require"
)

func TestInputResourceDispatcherHandle(t *testing.T) {
wellKnownGVK := schema.GroupVersionKind{Group: "example.io", Version: "v1", Kind: "Widget"}
obj := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "widget-a",
Namespace: "default",
},
}

scenarios := []struct {
name string
filters map[schema.GroupVersionKind][]inputResourceEventFilter
inputGVK schema.GroupVersionKind
inputObj client.Object
expectedEvents []event.GenericEvent
}{
{
name: "dispatches matching filter",
filters: map[schema.GroupVersionKind][]inputResourceEventFilter{
wellKnownGVK: {
func(cObj client.Object) bool {
return cObj.GetName() == "widget-a"
},
},
},
inputGVK: wellKnownGVK,
inputObj: obj,
expectedEvents: []event.GenericEvent{
{Object: obj},
},
},
{
name: "does not dispatch when filters do not match",
filters: map[schema.GroupVersionKind][]inputResourceEventFilter{
wellKnownGVK: {
func(cObj client.Object) bool {
return cObj.GetName() == "widget-b"
},
},
},
inputGVK: wellKnownGVK,
inputObj: obj,
},
{
name: "dispatches when gvk has no filters",
filters: map[schema.GroupVersionKind][]inputResourceEventFilter{},
inputGVK: wellKnownGVK,
inputObj: obj,
expectedEvents: []event.GenericEvent{
{Object: obj},
},
},
{
name: "dispatches when any filter matches",
filters: map[schema.GroupVersionKind][]inputResourceEventFilter{
wellKnownGVK: {
func(cObj client.Object) bool {
return cObj.GetName() == "widget-b"
},
func(cObj client.Object) bool {
return cObj.GetNamespace() == "default"
},
},
},
inputGVK: wellKnownGVK,
inputObj: obj,
expectedEvents: []event.GenericEvent{
{Object: obj},
},
},
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
dispatcher := newInputResourceDispatcher(scenario.filters)
// dispatch in a goroutine for simplicity with an unbuffered channel
go dispatcher.Handle(scenario.inputGVK, scenario.inputObj)

events := readEvents(t, dispatcher.ResultChan(), len(scenario.expectedEvents))
require.Equal(t, scenario.expectedEvents, events)
ensureNoMoreEvents(t, dispatcher.ResultChan())
})
}
}

func readEvents(t *testing.T, ch <-chan event.GenericEvent, expected int) []event.GenericEvent {
if expected == 0 {
return nil
}

events := make([]event.GenericEvent, 0, expected)
for i := 0; i < expected; i++ {
select {
case evt := <-ch:
events = append(events, evt)
case <-time.After(100 * time.Millisecond):
require.Failf(t, "expected event not received", "received %d/%d events", len(events), expected)
}
}

return events
}

func ensureNoMoreEvents(t *testing.T, ch <-chan event.GenericEvent) {
select {
case ev := <-ch:
require.Failf(t, "unexpected event received", "got %+v", ev)
case <-time.After(100 * time.Millisecond):
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package openshiftmanager

import (
"context"
"fmt"

"github.com/openshift/multi-operator-manager/pkg/library/libraryinputresources"
"sigs.k8s.io/controller-runtime/pkg/client"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"

"sigs.k8s.io/controller-runtime/pkg/cache"
)

// inputResourceInitializer is responsible for discovering input resources
// required by operators and starting the corresponding informers.
//
// once all informers are successfully started and fully synced,
// the initializer completes its execution.
//
// TODO: after informer synchronization send readiness signal to the main controller.
type inputResourceInitializer struct {
managementClusterRESTMapper meta.RESTMapper
managementClusterCache cache.Cache
}

func newInputResourceInitializer(mgmtClusterRESTMapper meta.RESTMapper, mgmtClusterCache cache.Cache) *inputResourceInitializer {
return &inputResourceInitializer{
managementClusterRESTMapper: mgmtClusterRESTMapper,
managementClusterCache: mgmtClusterCache,
}
}

func (r *inputResourceInitializer) Start(ctx context.Context) error {
inputResources, err := r.discoverInputResources()
if err != nil {
return err
}
if err = r.checkSupportedInputResources(inputResources); err != nil {
return err
}
// TODO: register filters for dispatcher based on inputRes
return r.startAndWaitForInformersFor(ctx, inputResources)
}

func (r *inputResourceInitializer) discoverInputResources() (map[string]*libraryinputresources.InputResources, error) {
return nil, fmt.Errorf("not implemented")
}
Comment on lines +36 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Start always fails because discovery is stubbed.

discoverInputResources returns a "not implemented" error, so Start will always error and the manager will fail to start when this controller is enabled. Consider gating until discovery exists or making discovery a temporary no-op.

🛠️ Possible no-op stub to avoid startup failure
 func (r *inputResourceInitializer) discoverInputResources() (map[string]*libraryinputresources.InputResources, error) {
-	return nil, fmt.Errorf("not implemented")
+	// TODO: replace with real discovery; empty map keeps initializer a no-op for now.
+	return map[string]*libraryinputresources.InputResources{}, nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (r *inputResourceInitializer) Start(ctx context.Context) error {
inputResources, err := r.discoverInputResources()
if err != nil {
return err
}
if err = r.checkSupportedInputResources(inputResources); err != nil {
return err
}
// TODO: register filters for dispatcher based on inputRes
return r.startAndWaitForInformersFor(ctx, inputResources)
}
func (r *inputResourceInitializer) discoverInputResources() (map[string]*libraryinputresources.InputResources, error) {
return nil, fmt.Errorf("not implemented")
}
func (r *inputResourceInitializer) Start(ctx context.Context) error {
inputResources, err := r.discoverInputResources()
if err != nil {
return err
}
if err = r.checkSupportedInputResources(inputResources); err != nil {
return err
}
// TODO: register filters for dispatcher based on inputRes
return r.startAndWaitForInformersFor(ctx, inputResources)
}
func (r *inputResourceInitializer) discoverInputResources() (map[string]*libraryinputresources.InputResources, error) {
// TODO: replace with real discovery; empty map keeps initializer a no-op for now.
return map[string]*libraryinputresources.InputResources{}, nil
}
🤖 Prompt for AI Agents
In
`@control-plane-operator/controllers/openshiftmanager/input_resource_initializer.go`
around lines 36 - 50, The Start method always fails because
discoverInputResources currently returns a "not implemented" error; replace the
stub so discovery is a safe no-op until real discovery is implemented: change
discoverInputResources to return an empty
map[string]*libraryinputresources.InputResources and nil (so inputResources is
empty) or alternatively early-return from Start when discovery is not ready, but
do not return an error. Update references to
inputResourceInitializer.discoverInputResources, Start,
checkSupportedInputResources, and startAndWaitForInformersFor to handle an empty
map appropriately so the operator can start successfully.


func (r *inputResourceInitializer) startAndWaitForInformersFor(ctx context.Context, inputResources map[string]*libraryinputresources.InputResources) error {
for operator, resources := range inputResources {
// note that for the POC we are only interested in ApplyConfigurationResources.ExactResources
// the checkSupportedInputResources ensures no other resources were provided.
//
// TODO: in the future we need to extend to full list
registeredGVK := sets.NewString()
for _, exactResource := range resources.ApplyConfigurationResources.ExactResources {
gvr := schema.GroupVersionResource{Group: exactResource.Group, Version: exactResource.Version, Resource: exactResource.Resource}
gvk, err := r.managementClusterRESTMapper.KindFor(gvr)
if err != nil {
return fmt.Errorf("unable to find Kind for %#v, for %s operator, err: %w", exactResource, operator, err)
}

if registeredGVK.Has(gvk.String()) {
continue
}

_, err = r.managementClusterCache.GetInformerForKind(ctx, gvk, cache.BlockUntilSynced(true))
if err != nil {
return err
}
// TODO: register informer event handlers

registeredGVK.Insert(gvk.String())
}
}

if !r.managementClusterCache.WaitForCacheSync(ctx) {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("caches did not sync")
}
return nil
}

// checkSupportedInputResources ensures only supported resources are present.
// this method is useful only for the POC purposes.
// in the future we will not need this method.
func (r *inputResourceInitializer) checkSupportedInputResources(inputResources map[string]*libraryinputresources.InputResources) error {
isResourceListSupported := func(resList libraryinputresources.ResourceList, areExactResourcesSupported bool, fieldPath string) error {
if !areExactResourcesSupported && len(resList.ExactResources) > 0 {
return fmt.Errorf("%v.ExactResources are unsupported for now", fieldPath)
}
if len(resList.GeneratedNameResources) > 0 {
return fmt.Errorf("%v.GeneratedNameResources are unsupported for now", fieldPath)
}
if len(resList.LabelSelectedResources) > 0 {
return fmt.Errorf("%v.LabelSelectedResources are unsupported for now", fieldPath)
}
if len(resList.ResourceReferences) > 0 {
return fmt.Errorf("%v.ResourceReferences are unsupported for now", fieldPath)
}
return nil
}

toCommonErrMsgFunc := func(operator string, err error) error {
return fmt.Errorf("unsupported input resources found for %s operator: %w", operator, err)
}
for operator, inputResource := range inputResources {
if err := isResourceListSupported(inputResource.ApplyConfigurationResources, true, "ApplyConfigurationResources"); err != nil {
return toCommonErrMsgFunc(operator, err)
}
if err := isResourceListSupported(inputResource.OperandResources.ConfigurationResources, false, "OperandResources.ConfigurationResources"); err != nil {
return toCommonErrMsgFunc(operator, err)
}
if err := isResourceListSupported(inputResource.OperandResources.ManagementResources, false, "OperandResources.ManagementResources"); err != nil {
return toCommonErrMsgFunc(operator, err)
}
if err := isResourceListSupported(inputResource.OperandResources.UserWorkloadResources, false, "OperandResources.UserWorkloadResources"); err != nil {
return toCommonErrMsgFunc(operator, err)
}
}
return nil
}

// buildInputResourceFilters prepares matchers to filter cluster(s) resources based on inputResources
func (r *inputResourceInitializer) buildInputResourceFilters(inputResources map[string]*libraryinputresources.InputResources) (map[schema.GroupVersionKind][]inputResourceEventFilter, error) {
filters := make(map[schema.GroupVersionKind][]inputResourceEventFilter)
for operator, resources := range inputResources {
// note that for the POC we are only interested in ApplyConfigurationResources.ExactResources
// the checkSupportedInputResources ensures no other resources were provided.
//
// TODO: in the future we need to extend to full list
for _, exactResource := range resources.ApplyConfigurationResources.ExactResources {
gvr := schema.GroupVersionResource{
Group: exactResource.Group,
Version: exactResource.Version,
Resource: exactResource.Resource,
}
gvk, err := r.managementClusterRESTMapper.KindFor(gvr)
if err != nil {
return nil, fmt.Errorf("unable to find Kind for %#v, for %s operator, err: %w", exactResource, operator, err)
}
filters[gvk] = append(filters[gvk], matchExactResourceFilter(exactResource))
}
}
return filters, nil
}

// matchExactResourceFilter returns a matcher that checks namespace and name when provided
func matchExactResourceFilter(def libraryinputresources.ExactResourceID) inputResourceEventFilter {
return func(obj client.Object) bool {
if def.Namespace != "" && obj.GetNamespace() != def.Namespace {
return false
}
if def.Name != "" && obj.GetName() != def.Name {
return false
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we can safely drop the checks for empty strings for both Name and Namespace.

return true
}
}
Loading