Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 51 additions & 82 deletions controllers/operandrequest/reconcile_operand.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
Expand Down Expand Up @@ -207,11 +208,6 @@ func (r *Reconciler) reconcileCRwithConfig(ctx context.Context, service *operato
k8sRes.SetName(res.Name)
k8sRes.SetNamespace(k8sResNs)

var k8sResConfig []byte
if res.Data != nil {
k8sResConfig = res.Data.Raw
}

err := r.Client.Get(ctx, types.NamespacedName{
Name: res.Name,
Namespace: k8sResNs,
Expand All @@ -220,14 +216,14 @@ func (r *Reconciler) reconcileCRwithConfig(ctx context.Context, service *operato
if err != nil && !apierrors.IsNotFound(err) {
merr.Add(errors.Wrapf(err, "failed to get k8s resource %s/%s", k8sResNs, res.Name))
} else if apierrors.IsNotFound(err) {
if err := r.createK8sResource(ctx, k8sRes, k8sResConfig, res.Labels, res.Annotations); err != nil {
if err := r.createK8sResource(ctx, k8sRes, res.Data, res.Labels, res.Annotations); err != nil {
merr.Add(err)
}
} else {
if checkLabel(k8sRes, map[string]string{constant.OpreqLabel: "true"}) && res.Force {
// Update k8s resource
klog.V(3).Info("Found existing k8s resource: " + res.Name)
if err := r.updateK8sResource(ctx, k8sRes, k8sResConfig, res.Labels, res.Annotations); err != nil {
if err := r.updateK8sResource(ctx, k8sRes, res.Data, res.Labels, res.Annotations); err != nil {
merr.Add(err)
}
} else {
Expand Down Expand Up @@ -775,19 +771,21 @@ func (r *Reconciler) checkCustomResource(ctx context.Context, requestInstance *o
return nil
}

func (r *Reconciler) createK8sResource(ctx context.Context, k8sResTemplate unstructured.Unstructured, k8sResConfig []byte, newLabels, newAnnotations map[string]string) error {
func (r *Reconciler) createK8sResource(ctx context.Context, k8sResTemplate unstructured.Unstructured, k8sResConfig *runtime.RawExtension, newLabels, newAnnotations map[string]string) error {
kind := k8sResTemplate.GetKind()
name := k8sResTemplate.GetName()
namespace := k8sResTemplate.GetNamespace()

k8sResConfigDecoded := make(map[string]interface{})
k8sResConfigUnmarshalErr := json.Unmarshal(k8sResConfig, &k8sResConfigDecoded)
if k8sResConfigUnmarshalErr != nil {
klog.Errorf("failed to unmarshal k8s Resource Config: %v", k8sResConfigUnmarshalErr)
}
if k8sResConfig != nil {
k8sResConfigDecoded := make(map[string]interface{})
k8sResConfigUnmarshalErr := json.Unmarshal(k8sResConfig.Raw, &k8sResConfigDecoded)
if k8sResConfigUnmarshalErr != nil {
klog.Errorf("failed to unmarshal k8s Resource Config: %v", k8sResConfigUnmarshalErr)
}

for k, v := range k8sResConfigDecoded {
k8sResTemplate.Object[k] = v
for k, v := range k8sResConfigDecoded {
k8sResTemplate.Object[k] = v
}
}

ensureLabel(k8sResTemplate, map[string]string{constant.OpreqLabel: "true"})
Expand All @@ -805,89 +803,60 @@ func (r *Reconciler) createK8sResource(ctx context.Context, k8sResTemplate unstr
return nil
}

func (r *Reconciler) updateK8sResource(ctx context.Context, existingK8sRes unstructured.Unstructured, k8sResConfig []byte, newLabels, newAnnotations map[string]string) error {
func (r *Reconciler) updateK8sResource(ctx context.Context, existingK8sRes unstructured.Unstructured, k8sResConfig *runtime.RawExtension, newLabels, newAnnotations map[string]string) error {
kind := existingK8sRes.GetKind()
apiversion := existingK8sRes.GetAPIVersion()
name := existingK8sRes.GetName()
namespace := existingK8sRes.GetNamespace()

// Update the k8s res
err := wait.PollImmediate(constant.DefaultCRFetchPeriod, constant.DefaultCRFetchTimeout, func() (bool, error) {
existingK8sRes = unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiversion,
"kind": kind,
},
}

existingK8sRes := unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiversion,
"kind": kind,
},
}
if err := r.Client.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &existingK8sRes); err != nil {
return errors.Wrapf(err, "failed to get k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

if !checkLabel(existingK8sRes, map[string]string{constant.OpreqLabel: "true"}) {
return nil
}

// delete the existing k8s resource, some resources could not be updated after it has been created
if err := r.Client.Delete(ctx, &existingK8sRes); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this line cause the k8s resource to get deleted in every reconcile loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when ODLM is reconciling, and the resources is forced to update and label is correct. It will be deleted and re-created

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But even if we use the Update(), the resource will be updated in each reconcile loop except the resource could not be updated.

One solution is to compare the existing resource with the new template, and decide whether we should update it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could I ask which k8s resouce, that can't be updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like job, some of fields in template is immutable, so we could not update it.

return errors.Wrapf(err, "failed to delete existing k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

// wait for the resource has been deleted
err := wait.PollImmediate(constant.DefaultCRFetchPeriod, constant.DefaultCRFetchTimeout, func() (bool, error) {
err := r.Client.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &existingK8sRes)

if err != nil {
return false, errors.Wrapf(err, "failed to get k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

if !checkLabel(existingK8sRes, map[string]string{constant.OpreqLabel: "true"}) {
if err == nil {
// continue wait for the deletion
return false, nil
} else if err != nil && apierrors.IsNotFound(err) {
return true, nil
}

// isEqual := checkAnnotation(existingK8sRes, newAnnotations) && checkLabel(existingK8sRes, newLabels)
k8sResConfigDecoded := make(map[string]interface{})
k8sResConfigUnmarshalErr := json.Unmarshal(k8sResConfig, &k8sResConfigDecoded)
if k8sResConfigUnmarshalErr != nil {
klog.Errorf("failed to unmarshal k8s Resource Config: %v", k8sResConfigUnmarshalErr)
}

for k, v := range k8sResConfigDecoded {
// isEqual = isEqual && reflect.DeepEqual(existingK8sRes.Object[k], v)
existingK8sRes.Object[k] = v
}

CRgeneration := existingK8sRes.GetGeneration()

// if isEqual {
// return true, nil
// }

ensureAnnotation(existingK8sRes, newAnnotations)
ensureLabel(existingK8sRes, newLabels)

klog.V(2).Infof("updating k8s resource with apiversion: %s, kind: %s, %s/%s", apiversion, kind, namespace, name)

err = r.Update(ctx, &existingK8sRes)

if err != nil {
return false, errors.Wrapf(err, "failed to update k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

UpdatedK8sRes := unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiversion,
"kind": kind,
},
}

err = r.Client.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &UpdatedK8sRes)

if err != nil {
return false, errors.Wrapf(err, "failed to get k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)

}

if UpdatedK8sRes.GetGeneration() != CRgeneration {
klog.V(2).Infof("Finish updating the k8s Resource: -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

return true, nil
return false, errors.Wrapf(err, "failed to wait k8s resource deleted -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
})

if err != nil {
return errors.Wrapf(err, "failed to wait k8s resource deleted -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

var k8sRes unstructured.Unstructured
k8sRes.SetAPIVersion(apiversion)
k8sRes.SetKind(kind)
k8sRes.SetName(name)
k8sRes.SetNamespace(namespace)
if err = r.createK8sResource(ctx, k8sRes, k8sResConfig, newLabels, newAnnotations); err != nil {
return errors.Wrapf(err, "failed to update k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

Expand Down