Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Revert "update the logic for updating the k8s resourcces (#752)"
This reverts commit c840b9e.
  • Loading branch information
Daniel-Fan committed Sep 30, 2021
commit d4376ebbcc0b393127ee186abf0e797960eacdb7
133 changes: 82 additions & 51 deletions controllers/operandrequest/reconcile_operand.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
Expand Down Expand Up @@ -208,6 +207,11 @@ func (r *Reconciler) reconcileCRwithConfig(ctx context.Context, service *operato
k8sRes.SetName(res.Name)
k8sRes.SetNamespace(k8sResNs)

var k8sResConfig []byte
if res.Data != nil {
k8sResConfig = res.Data.Raw
}

err := r.Client.Get(ctx, types.NamespacedName{
Name: res.Name,
Namespace: k8sResNs,
Expand All @@ -216,14 +220,14 @@ func (r *Reconciler) reconcileCRwithConfig(ctx context.Context, service *operato
if err != nil && !apierrors.IsNotFound(err) {
merr.Add(errors.Wrapf(err, "failed to get k8s resource %s/%s", k8sResNs, res.Name))
} else if apierrors.IsNotFound(err) {
if err := r.createK8sResource(ctx, k8sRes, res.Data, res.Labels, res.Annotations); err != nil {
if err := r.createK8sResource(ctx, k8sRes, k8sResConfig, res.Labels, res.Annotations); err != nil {
merr.Add(err)
}
} else {
if checkLabel(k8sRes, map[string]string{constant.OpreqLabel: "true"}) && res.Force {
// Update k8s resource
klog.V(3).Info("Found existing k8s resource: " + res.Name)
if err := r.updateK8sResource(ctx, k8sRes, res.Data, res.Labels, res.Annotations); err != nil {
if err := r.updateK8sResource(ctx, k8sRes, k8sResConfig, res.Labels, res.Annotations); err != nil {
merr.Add(err)
}
} else {
Expand Down Expand Up @@ -771,21 +775,19 @@ func (r *Reconciler) checkCustomResource(ctx context.Context, requestInstance *o
return nil
}

func (r *Reconciler) createK8sResource(ctx context.Context, k8sResTemplate unstructured.Unstructured, k8sResConfig *runtime.RawExtension, newLabels, newAnnotations map[string]string) error {
func (r *Reconciler) createK8sResource(ctx context.Context, k8sResTemplate unstructured.Unstructured, k8sResConfig []byte, newLabels, newAnnotations map[string]string) error {
kind := k8sResTemplate.GetKind()
name := k8sResTemplate.GetName()
namespace := k8sResTemplate.GetNamespace()

if k8sResConfig != nil {
k8sResConfigDecoded := make(map[string]interface{})
k8sResConfigUnmarshalErr := json.Unmarshal(k8sResConfig.Raw, &k8sResConfigDecoded)
if k8sResConfigUnmarshalErr != nil {
klog.Errorf("failed to unmarshal k8s Resource Config: %v", k8sResConfigUnmarshalErr)
}
k8sResConfigDecoded := make(map[string]interface{})
k8sResConfigUnmarshalErr := json.Unmarshal(k8sResConfig, &k8sResConfigDecoded)
if k8sResConfigUnmarshalErr != nil {
klog.Errorf("failed to unmarshal k8s Resource Config: %v", k8sResConfigUnmarshalErr)
}

for k, v := range k8sResConfigDecoded {
k8sResTemplate.Object[k] = v
}
for k, v := range k8sResConfigDecoded {
k8sResTemplate.Object[k] = v
}

ensureLabel(k8sResTemplate, map[string]string{constant.OpreqLabel: "true"})
Expand All @@ -803,60 +805,89 @@ func (r *Reconciler) createK8sResource(ctx context.Context, k8sResTemplate unstr
return nil
}

func (r *Reconciler) updateK8sResource(ctx context.Context, existingK8sRes unstructured.Unstructured, k8sResConfig *runtime.RawExtension, newLabels, newAnnotations map[string]string) error {
func (r *Reconciler) updateK8sResource(ctx context.Context, existingK8sRes unstructured.Unstructured, k8sResConfig []byte, newLabels, newAnnotations map[string]string) error {
kind := existingK8sRes.GetKind()
apiversion := existingK8sRes.GetAPIVersion()
name := existingK8sRes.GetName()
namespace := existingK8sRes.GetNamespace()

existingK8sRes = unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiversion,
"kind": kind,
},
}

if err := r.Client.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &existingK8sRes); err != nil {
return errors.Wrapf(err, "failed to get k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

if !checkLabel(existingK8sRes, map[string]string{constant.OpreqLabel: "true"}) {
return nil
}
// Update the k8s res
err := wait.PollImmediate(constant.DefaultCRFetchPeriod, constant.DefaultCRFetchTimeout, func() (bool, error) {

// delete the existing k8s resource, some resources could not be updated after it has been created
if err := r.Client.Delete(ctx, &existingK8sRes); err != nil {
return errors.Wrapf(err, "failed to delete existing k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}
existingK8sRes := unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiversion,
"kind": kind,
},
}

// wait for the resource has been deleted
err := wait.PollImmediate(constant.DefaultCRFetchPeriod, constant.DefaultCRFetchTimeout, func() (bool, error) {
err := r.Client.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &existingK8sRes)
if err == nil {
// continue wait for the deletion
return false, nil
} else if err != nil && apierrors.IsNotFound(err) {

if err != nil {
return false, errors.Wrapf(err, "failed to get k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

if !checkLabel(existingK8sRes, map[string]string{constant.OpreqLabel: "true"}) {
return true, nil
}
return false, errors.Wrapf(err, "failed to wait k8s resource deleted -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)

// isEqual := checkAnnotation(existingK8sRes, newAnnotations) && checkLabel(existingK8sRes, newLabels)
k8sResConfigDecoded := make(map[string]interface{})
k8sResConfigUnmarshalErr := json.Unmarshal(k8sResConfig, &k8sResConfigDecoded)
if k8sResConfigUnmarshalErr != nil {
klog.Errorf("failed to unmarshal k8s Resource Config: %v", k8sResConfigUnmarshalErr)
}

for k, v := range k8sResConfigDecoded {
// isEqual = isEqual && reflect.DeepEqual(existingK8sRes.Object[k], v)
existingK8sRes.Object[k] = v
}

CRgeneration := existingK8sRes.GetGeneration()

// if isEqual {
// return true, nil
// }

ensureAnnotation(existingK8sRes, newAnnotations)
ensureLabel(existingK8sRes, newLabels)

klog.V(2).Infof("updating k8s resource with apiversion: %s, kind: %s, %s/%s", apiversion, kind, namespace, name)

err = r.Update(ctx, &existingK8sRes)

if err != nil {
return false, errors.Wrapf(err, "failed to update k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

UpdatedK8sRes := unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiversion,
"kind": kind,
},
}

err = r.Client.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &UpdatedK8sRes)

if err != nil {
return false, errors.Wrapf(err, "failed to get k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)

}

if UpdatedK8sRes.GetGeneration() != CRgeneration {
klog.V(2).Infof("Finish updating the k8s Resource: -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

return true, nil
})

if err != nil {
return errors.Wrapf(err, "failed to wait k8s resource deleted -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

var k8sRes unstructured.Unstructured
k8sRes.SetAPIVersion(apiversion)
k8sRes.SetKind(kind)
k8sRes.SetName(name)
k8sRes.SetNamespace(namespace)
if err = r.createK8sResource(ctx, k8sRes, k8sResConfig, newLabels, newAnnotations); err != nil {
return errors.Wrapf(err, "failed to update k8s resource -- Kind: %s, NamespacedName: %s/%s", kind, namespace, name)
}

Expand Down