Skip to content

Commit

Permalink
fix expected patch operations may be missed when AggregateStatus
Browse files Browse the repository at this point in the history
Signed-off-by: chaosi-zju <[email protected]>
  • Loading branch information
chaosi-zju committed Aug 14, 2024
1 parent 5a10d75 commit 478b8c6
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 436 deletions.
81 changes: 54 additions & 27 deletions pkg/controllers/status/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import (
"context"
"reflect"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand All @@ -34,8 +37,8 @@ import (
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)

Expand Down Expand Up @@ -105,43 +108,67 @@ func updateResourceStatus(
dynamicClient dynamic.Interface,
restMapper meta.RESTMapper,
interpreter resourceinterpreter.ResourceInterpreter,
resource *unstructured.Unstructured,
eventRecorder record.EventRecorder,
objRef workv1alpha2.ObjectReference,
bindingStatus workv1alpha2.ResourceBindingStatus,
) error {
gvr, err := restmapper.GetGroupVersionResource(restMapper, schema.FromAPIVersionAndKind(resource.GetAPIVersion(), resource.GetKind()))
gvr, err := restmapper.GetGroupVersionResource(restMapper, schema.FromAPIVersionAndKind(objRef.APIVersion, objRef.Kind))
if err != nil {
klog.Errorf("Failed to get GVR from GVK(%s/%s), Error: %v", resource.GetAPIVersion(), resource.GetKind(), err)
klog.Errorf("Failed to get GVR from GVK(%s/%s), Error: %v", objRef.APIVersion, objRef.Kind, err)
return err
}

if !interpreter.HookEnabled(resource.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus) {
gvk := schema.GroupVersionKind{Group: gvr.Group, Version: gvr.Version, Kind: objRef.Kind}
if !interpreter.HookEnabled(gvk, configv1alpha1.InterpreterOperationAggregateStatus) {
return nil
}
newObj, err := interpreter.AggregateStatus(resource, bindingStatus.AggregatedStatus)
if err != nil {
klog.Errorf("Failed to aggregate status for resource(%s/%s/%s, Error: %v", gvr, resource.GetNamespace(), resource.GetName(), err)
return err
}

oldStatus, _, _ := unstructured.NestedFieldNoCopy(resource.Object, "status")
newStatus, _, _ := unstructured.NestedFieldNoCopy(newObj.Object, "status")
if reflect.DeepEqual(oldStatus, newStatus) {
klog.V(3).Infof("Ignore update resource(%s/%s/%s) status as up to date.", gvr, resource.GetNamespace(), resource.GetName())
return nil
}
var resource *unstructured.Unstructured
if err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Fetch resource template from karmada-apiserver instead of informer cache, to avoid retry due to
// resource conflict which often happens, especially with a huge amount of resource templates and
// the informer cache doesn't sync quickly enough.
// For more details refer to https://github.com/karmada-io/karmada/issues/5285.
resource, err = dynamicClient.Resource(gvr).Namespace(objRef.Namespace).Get(ctx, objRef.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// It might happen when the resource template has been removed but the garbage collector hasn't removed
// the ResourceBinding which dependent on resource template.
// So, just return without retry(requeue) would save unnecessary loop.
return nil
}
klog.Errorf("Failed to fetch resource template(%s/%s/%s), Error: %v.", gvr, objRef.Namespace, objRef.Name, err)
return err
}

patchBytes, err := helper.GenReplaceFieldJSONPatch("/status", oldStatus, newStatus)
if err != nil {
klog.Errorf("Failed to gen patch bytes for resource(%s/%s/%s, Error: %v", gvr, resource.GetNamespace(), resource.GetName(), err)
return err
}
newObj, err := interpreter.AggregateStatus(resource, bindingStatus.AggregatedStatus)
if err != nil {
klog.Errorf("Failed to aggregate status for resource template(%s/%s/%s), Error: %v", gvr, resource.GetNamespace(), resource.GetName(), err)
return err
}

_, err = dynamicClient.Resource(gvr).Namespace(resource.GetNamespace()).
Patch(ctx, resource.GetName(), types.JSONPatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", gvr, resource.GetNamespace(), resource.GetName(), err)
oldStatus, _, _ := unstructured.NestedFieldNoCopy(resource.Object, "status")
newStatus, _, _ := unstructured.NestedFieldNoCopy(newObj.Object, "status")
if reflect.DeepEqual(oldStatus, newStatus) {
klog.V(3).Infof("Ignore update resource(%s/%s/%s) status as up to date.", gvr, resource.GetNamespace(), resource.GetName())
return nil
}

if _, err = dynamicClient.Resource(gvr).Namespace(resource.GetNamespace()).UpdateStatus(ctx, newObj, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", gvr, resource.GetNamespace(), resource.GetName(), err)
return err
}

eventRecorder.Event(resource, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, "Update Resource with AggregatedStatus successfully.")
klog.V(3).Infof("Update resource(%s/%s/%s) status successfully.", gvr, resource.GetNamespace(), resource.GetName())

return nil
}); err != nil {
if resource != nil {
eventRecorder.Event(resource, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error())
}
return err
}
klog.V(3).Infof("Update resource(%s/%s/%s) status successfully.", gvr, resource.GetNamespace(), resource.GetName())

return nil
}
17 changes: 2 additions & 15 deletions pkg/controllers/status/crb_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,14 @@ func (c *CRBStatusController) SetupWithManager(mgr controllerruntime.Manager) er
}

func (c *CRBStatusController) syncBindingStatus(ctx context.Context, binding *workv1alpha2.ClusterResourceBinding) error {
resource, err := helper.FetchResourceTemplate(ctx, c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
if err != nil {
if apierrors.IsNotFound(err) {
// It might happen when the resource template has been removed but the garbage collector hasn't removed
// the ResourceBinding which dependent on resource template.
// So, just return without retry(requeue) would save unnecessary loop.
return nil
}
klog.Errorf("Failed to fetch workload for clusterResourceBinding(%s). Error: %v",
binding.GetName(), err)
return err
}

err = helper.AggregateClusterResourceBindingWorkStatus(ctx, c.Client, binding, resource, c.EventRecorder)
err := helper.AggregateClusterResourceBindingWorkStatus(ctx, c.Client, binding, c.EventRecorder)
if err != nil {
klog.Errorf("Failed to aggregate workStatues to clusterResourceBinding(%s), Error: %v",
binding.Name, err)
return err
}

err = updateResourceStatus(ctx, c.DynamicClient, c.RESTMapper, c.ResourceInterpreter, resource, binding.Status)
err = updateResourceStatus(ctx, c.DynamicClient, c.RESTMapper, c.ResourceInterpreter, c.EventRecorder, binding.Spec.Resource, binding.Status)
if err != nil {
return err
}
Expand Down
20 changes: 4 additions & 16 deletions pkg/controllers/status/rb_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,29 +111,17 @@ func (c *RBStatusController) SetupWithManager(mgr controllerruntime.Manager) err
}

func (c *RBStatusController) syncBindingStatus(ctx context.Context, binding *workv1alpha2.ResourceBinding) error {
resourceTemplate, err := helper.FetchResourceTemplate(ctx, c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
err := helper.AggregateResourceBindingWorkStatus(ctx, c.Client, binding, c.EventRecorder)
if err != nil {
if apierrors.IsNotFound(err) {
// It might happen when the resource template has been removed but the garbage collector hasn't removed
// the ResourceBinding which dependent on resource template.
// So, just return without retry(requeue) would save unnecessary loop.
return nil
}
klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
return err
}

err = helper.AggregateResourceBindingWorkStatus(ctx, c.Client, binding, resourceTemplate, c.EventRecorder)
if err != nil {
klog.Errorf("Failed to aggregate workStatues to resourceBinding(%s/%s), Error: %v",
klog.Errorf("Failed to aggregate workStatus to resourceBinding(%s/%s), Error: %v",
binding.Namespace, binding.Name, err)
return err
}

err = updateResourceStatus(ctx, c.DynamicClient, c.RESTMapper, c.ResourceInterpreter, resourceTemplate, binding.Status)
err = updateResourceStatus(ctx, c.DynamicClient, c.RESTMapper, c.ResourceInterpreter, c.EventRecorder, binding.Spec.Resource, binding.Status)
if err != nil {
return err
}

return nil
}
4 changes: 3 additions & 1 deletion pkg/controllers/status/work_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,9 @@ func (c *WorkStatusController) reflectStatus(ctx context.Context, work *workv1al
}

func (c *WorkStatusController) buildStatusIdentifier(work *workv1alpha1.Work, clusterObj *unstructured.Unstructured) (*workv1alpha1.ResourceIdentifier, error) {
ordinal, err := helper.GetManifestIndex(work.Spec.Workload.Manifests, clusterObj)
manifestRef := helper.ManifestReference{APIVersion: clusterObj.GetAPIVersion(), Kind: clusterObj.GetKind(),
Namespace: clusterObj.GetNamespace(), Name: clusterObj.GetName()}
ordinal, err := helper.GetManifestIndex(work.Spec.Workload.Manifests, &manifestRef)
if err != nil {
return nil, err
}
Expand Down
55 changes: 0 additions & 55 deletions pkg/util/helper/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,10 @@ package helper
import (
"encoding/json"
"fmt"
"reflect"

jsonpatch "github.com/evanphx/json-patch/v5"
)

// RFC6902 JSONPatch operations
const (
JSONPatchOPAdd = "add"
JSONPatchOPReplace = "replace"
JSONPatchOPRemove = "remove"
JSONPatchOPMove = "move"
JSONPatchOPCopy = "copy"
JSONPatchOPTest = "test"
)

type jsonPatch struct {
OP string `json:"op"`
From string `json:"from,omitempty"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}

// GenMergePatch will return a merge patch document capable of converting the
// original object to the modified object.
// The merge patch format is primarily intended for use with the HTTP PATCH method
Expand Down Expand Up @@ -84,40 +66,3 @@ func GenFieldMergePatch(fieldName string, originField interface{}, modifiedField
patchBytes = append([]byte(`{"`+fieldName+`":`), patchBytes...)
return patchBytes, nil
}

// GenReplaceFieldJSONPatch returns the RFC6902 JSONPatch array as []byte, which is used to simply
// add/replace/delete certain JSON **Object** field.
func GenReplaceFieldJSONPatch(path string, originalFieldValue, newFieldValue interface{}) ([]byte, error) {
if reflect.DeepEqual(originalFieldValue, newFieldValue) {
return nil, nil
}
if newFieldValue == nil {
return GenJSONPatch(JSONPatchOPRemove, "", path, nil)
}
// The implementation of "add" and "replace" for JSON objects is actually the same
// in "github.com/evanphx/json-patch/v5", which is used by Karmada and K8s.
// We implemented it here just to follow the RFC6902.
if originalFieldValue == nil {
return GenJSONPatch(JSONPatchOPAdd, "", path, newFieldValue)
}
return GenJSONPatch(JSONPatchOPReplace, "", path, newFieldValue)
}

// GenJSONPatch return JSONPatch array as []byte according to RFC6902
func GenJSONPatch(op, from, path string, value interface{}) ([]byte, error) {
jp := jsonPatch{
OP: op,
Path: path,
}
switch op {
case JSONPatchOPAdd, JSONPatchOPReplace, JSONPatchOPTest:
jp.Value = value
case JSONPatchOPMove, JSONPatchOPCopy:
jp.From = from
case JSONPatchOPRemove:
default:
return nil, fmt.Errorf("unrecognized JSONPatch OP: %s", op)
}

return json.Marshal([]jsonPatch{jp})
}
Loading

0 comments on commit 478b8c6

Please sign in to comment.