Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions controllers/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,7 @@ const (

//DefaultCSVWaitPeriod is the default period for wait CSV ready
DefaultCSVWaitPeriod = 1 * time.Minute

//DefaultCRRetryNumber is the default maximum number of retry for reconciling a custom resource
DefaultCRRetryNumber = 3
)
196 changes: 114 additions & 82 deletions controllers/operandrequest/reconcile_operand.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"sync"
"time"

olmv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -219,76 +220,31 @@ func (r *Reconciler) reconcileCRwithConfig(ctx context.Context, service *operato

// Create k8s resources required by service
if service.Resources != nil {
for i := range service.Resources {
res := service.Resources[i]
if res.APIVersion == "" {
return fmt.Errorf("The APIVersion of k8s resource is empty for operator " + service.Name)
}

if res.Kind == "" {
return fmt.Errorf("The Kind of k8s resource is empty for operator " + service.Name)
}
if res.Name == "" {
return fmt.Errorf("The Name of k8s resource is empty for operator " + service.Name)
}
var k8sResNs string
if res.Namespace == "" {
k8sResNs = opConfigNs
} else {
k8sResNs = res.Namespace
}

resObject, err := util.ObjectToNewUnstructured(&res)
if err != nil {
klog.Errorf("Failed to convert %s %s/%s object to unstructured.Unstructured object", res.Kind, k8sResNs, res.Name)
return err
}

if err := r.ParseValueReferenceInObject(ctx, "data", resObject.Object["data"], resObject.Object, "OperandConfig", opConfigName, opConfigNs); err != nil {
klog.Errorf("Failed to parse value reference in resource %s/%s: %v", k8sResNs, res.Name, err)
return err
}
// cover unstructured.Unstructured object to original OperandConfig object
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resObject.Object, &res); err != nil {
klog.Errorf("Failed to convert unstructured.Unstructured object to %s %s/%s object", res.Kind, k8sResNs, res.Name)
return err
}

var k8sRes unstructured.Unstructured
k8sRes.SetAPIVersion(res.APIVersion)
k8sRes.SetKind(res.Kind)
k8sRes.SetName(res.Name)
k8sRes.SetNamespace(k8sResNs)

verbs := []string{"create", "delete", "get", "update"}
if r.checkResAuth(ctx, verbs, k8sRes) {
err := r.Client.Get(ctx, types.NamespacedName{
Name: res.Name,
Namespace: k8sResNs,
}, &k8sRes)
// Get the chunk size
var chunkSize int
if r.StepSize > 0 {
chunkSize = r.StepSize
} else {
chunkSize = 1
}
var wg sync.WaitGroup
semaphore := make(chan struct{}, chunkSize)

if err != nil && !apierrors.IsNotFound(err) {
merr.Add(errors.Wrapf(err, "failed to get k8s resource %s/%s", k8sResNs, res.Name))
} else if apierrors.IsNotFound(err) {
if err := r.createK8sResource(ctx, k8sRes, res.Data, res.Labels, res.Annotations, &res.OwnerReferences); err != nil {
merr.Add(err)
}
} else {
if res.Force {
// Update k8s resource
klog.V(3).Info("Found existing k8s resource: " + res.Name)
if err := r.updateK8sResource(ctx, k8sRes, res.Data, res.Labels, res.Annotations, &res.OwnerReferences); err != nil {
merr.Add(err)
}
} else {
klog.V(2).Infof("Skip the k8s resource %s/%s which is not created by ODLM", res.Kind, res.Name)
}
for i := range service.Resources {
wg.Add(1)
semaphore <- struct{}{}
go func(res operatorv1alpha1.ConfigResource) {
defer wg.Done()
defer func() { <-semaphore }() // release semaphore
err := r.reconcileK8sResourceWithRetries(ctx, res, service.Name, opConfigName, opConfigNs)
if err != nil {
merr.Add(err)
}
} else {
klog.Infof("ODLM doesn't have enough permission to reconcile k8s resource -- Kind: %s, NamespacedName: %s/%s", res.Kind, k8sResNs, res.Name)
}
}(service.Resources[i])
}

wg.Wait()

if len(merr.Errors) != 0 {
return merr
}
Expand Down Expand Up @@ -530,6 +486,93 @@ func newServiceStatus(operatorName string, namespace string, resources []operato
return serviceSpec
}

func (r *Reconciler) reconcileK8sResourceWithRetries(ctx context.Context, res operatorv1alpha1.ConfigResource, serviceName, opConfigName, opConfigNs string) error {
var err error
for i := 0; i < int(constant.DefaultCRRetryNumber); i++ {
err = r.reconcileK8sResource(ctx, res, serviceName, opConfigName, opConfigNs)
if err == nil {
return nil
}
klog.Errorf("Failed to reconcile k8s resource -- Kind: %s, NamespacedName: %s/%s with error: %v", res.Kind, res.Namespace, res.Name, err)
if i < int(constant.DefaultCRRetryNumber)-1 {
waitTime := time.Duration((1 << i) * 4 * int(time.Second))
klog.Warningf("Retry reconcile k8s resource -- Kind: %s, NamespacedName: %s/%s after waiting %v", res.Kind, res.Namespace, res.Name, waitTime)
time.Sleep(waitTime)
}
}
return err
}

func (r *Reconciler) reconcileK8sResource(ctx context.Context, res operatorv1alpha1.ConfigResource, serviceName, opConfigName, opConfigNs string) error {
if res.APIVersion == "" {
return fmt.Errorf("The APIVersion of k8s resource is empty for operator " + serviceName)
}

if res.Kind == "" {
return fmt.Errorf("The Kind of k8s resource is empty for operator " + serviceName)
}
if res.Name == "" {
return fmt.Errorf("The Name of k8s resource is empty for operator " + serviceName)
}
var k8sResNs string
if res.Namespace == "" {
k8sResNs = opConfigNs
} else {
k8sResNs = res.Namespace
}

resObject, err := util.ObjectToNewUnstructured(&res)
if err != nil {
klog.Errorf("Failed to convert %s %s/%s object to unstructured.Unstructured object", res.Kind, k8sResNs, res.Name)
return err
}

if err := r.ParseValueReferenceInObject(ctx, "data", resObject.Object["data"], resObject.Object, "OperandConfig", opConfigName, opConfigNs); err != nil {
klog.Errorf("Failed to parse value reference in resource %s/%s: %v", k8sResNs, res.Name, err)
return err
}
// cover unstructured.Unstructured object to original OperandConfig object
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resObject.Object, &res); err != nil {
klog.Errorf("Failed to convert unstructured.Unstructured object to %s %s/%s object", res.Kind, k8sResNs, res.Name)
return err
}

var k8sRes unstructured.Unstructured
k8sRes.SetAPIVersion(res.APIVersion)
k8sRes.SetKind(res.Kind)
k8sRes.SetName(res.Name)
k8sRes.SetNamespace(k8sResNs)

verbs := []string{"create", "delete", "get", "update"}
if r.checkResAuth(ctx, verbs, k8sRes) {
err := r.Client.Get(ctx, types.NamespacedName{
Name: res.Name,
Namespace: k8sResNs,
}, &k8sRes)

if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get k8s resource %s/%s", k8sResNs, res.Name)
} else if apierrors.IsNotFound(err) {
if err := r.createK8sResource(ctx, k8sRes, res.Data, res.Labels, res.Annotations, &res.OwnerReferences); err != nil {
return err
}
} else {
if res.Force {
// Update k8s resource
klog.V(3).Info("Found existing k8s resource: " + res.Name)
if err := r.updateK8sResource(ctx, k8sRes, res.Data, res.Labels, res.Annotations, &res.OwnerReferences); err != nil {
return err
}
} else {
klog.V(2).Infof("Skip the k8s resource %s/%s which is not created by ODLM", res.Kind, res.Name)
}
}
} else {
klog.Infof("ODLM doesn't have enough permission to reconcile k8s resource -- Kind: %s, NamespacedName: %s/%s", res.Kind, k8sResNs, res.Name)
}
return nil
}

// deleteAllCustomResource remove custom resource base on OperandConfig and CSV alm-examples
func (r *Reconciler) deleteAllCustomResource(ctx context.Context, csv *olmv1alpha1.ClusterServiceVersion, requestInstance *operatorv1alpha1.OperandRequest, csc *operatorv1alpha1.OperandConfig, operandName, namespace string) error {

Expand Down Expand Up @@ -1067,8 +1110,7 @@ func (r *Reconciler) updateK8sResource(ctx context.Context, existingK8sRes unstr
// Convert existing k8s resource to string
existingK8sResRaw, err := json.Marshal(existingK8sRes.Object)
if err != nil {
klog.Error(err)
return false, err
return false, errors.Wrapf(err, "failed to marshal existing k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

// Merge the existing CR and the CR from the OperandConfig
Expand All @@ -1082,21 +1124,9 @@ func (r *Reconciler) updateK8sResource(ctx context.Context, existingK8sRes unstr
return false, errors.Wrapf(err, "failed to set ownerReferences for k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

resourceVersion := existingK8sRes.GetResourceVersion()
CRgeneration := existingK8sRes.GetGeneration()
err = r.Update(ctx, &existingK8sRes, client.DryRunAll)
if err != nil {
return false, errors.Wrapf(err, "failed to update k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

if newResourceVersion := existingK8sRes.GetResourceVersion(); resourceVersion == newResourceVersion {
// if the resourceVersion is the same, the update is not performed
klog.Infof("The k8s resource with apiversion: %s, kind: %s, %s/%s is not updated", apiversion, kind, namespace, name)
return true, nil
}

klog.Infof("updating k8s resource with apiversion: %s, kind: %s, %s/%s", apiversion, kind, namespace, name)

resourceVersion := existingK8sRes.GetResourceVersion()
err = r.Update(ctx, &existingK8sRes)

if err != nil {
Expand All @@ -1120,8 +1150,10 @@ func (r *Reconciler) updateK8sResource(ctx context.Context, existingK8sRes unstr

}

if UpdatedK8sRes.GetGeneration() != CRgeneration {
if UpdatedK8sRes.GetResourceVersion() != resourceVersion {
klog.Infof("Finish updating the k8s Resource: -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
} else {
klog.Infof("No updates on k8s resource with apiversion: %s, kind: %s, %s/%s", apiversion, kind, namespace, name)
}
}
return true, nil
Expand Down
2 changes: 1 addition & 1 deletion controllers/operator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewODLMOperator(mgr manager.Manager, name string) *ODLMOperator {
Config: mgr.GetConfig(),
Recorder: mgr.GetEventRecorderFor(name),
Scheme: mgr.GetScheme(),
MaxConcurrentReconciles: 5,
MaxConcurrentReconciles: 3,
}
}

Expand Down
21 changes: 20 additions & 1 deletion controllers/operatorconfig/operatorconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/barkimedes/go-deepcopy"
olmv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -169,7 +170,20 @@ func (r *Reconciler) requestsFromMapFunc(ctx context.Context) handler.MapFunc {
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
ctx := context.Background()
return ctrl.NewControllerManagedBy(mgr).
For(&operatorv1alpha1.OperandRequest{}).
For(&operatorv1alpha1.OperandRequest{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
DeleteFunc: func(e event.DeleteEvent) bool {
// Evaluates to false if the object has been confirmed deleted.
return !e.DeleteStateUnknown
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldObject := e.ObjectOld.(*operatorv1alpha1.OperandRequest)
newObject := e.ObjectNew.(*operatorv1alpha1.OperandRequest)
return !equality.Semantic.DeepEqual(oldObject.Spec, newObject.Spec) || !equality.Semantic.DeepEqual(oldObject.Status, newObject.Status)
},
})).
Watches(&source.Kind{Type: &operatorv1alpha1.OperatorConfig{}}, handler.EnqueueRequestsFromMapFunc(r.requestsFromMapFunc(ctx)), builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
Expand All @@ -178,6 +192,11 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
// Evaluates to false if the object has been confirmed deleted.
return !e.DeleteStateUnknown
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldObject := e.ObjectOld.(*operatorv1alpha1.OperatorConfig)
newObject := e.ObjectNew.(*operatorv1alpha1.OperatorConfig)
return !equality.Semantic.DeepEqual(oldObject.Spec, newObject.Spec)
},
})).
Complete(r)
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func main() {
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
var stepSize = flag.Int("batch-chunk-size", 1, "batch-chunk-size is used to control at most how many subscriptions will be created concurrently")
var stepSize = flag.Int("batch-chunk-size", 3, "batch-chunk-size is used to control at most how many subscriptions will be created concurrently")

flag.Parse()

Expand Down