diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 03de411cce..2bbbb33aea 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -28,12 +28,12 @@ jobs: id: vars run: echo "go_version=$(make go-version)" >> $GITHUB_OUTPUT - name: Set up Go - uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # tag=v5.5.0 + uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # tag=v6.0.0 with: go-version: ${{ steps.vars.outputs.go_version }} - name: golangci-lint uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # tag=v8.0.0 with: - version: v2.3.0 + version: v2.5.0 args: --output.text.print-linter-name=true --output.text.colors=true --timeout 10m working-directory: ${{matrix.working-directory}} diff --git a/.github/workflows/ossf-scorecard.yaml b/.github/workflows/ossf-scorecard.yaml index 671dbc88bd..24156f49e0 100644 --- a/.github/workflows/ossf-scorecard.yaml +++ b/.github/workflows/ossf-scorecard.yaml @@ -31,7 +31,7 @@ jobs: persist-credentials: false - name: "Run analysis" - uses: ossf/scorecard-action@05b42c624433fc40578a4040d5cf5e36ddca8cde # tag=v2.4.2 + uses: ossf/scorecard-action@4eaacf0543bb3f2c246792bd56e8cdeffafb205a # tag=v2.4.3 with: results_file: results.sarif results_format: sarif diff --git a/.github/workflows/pr-dependabot.yaml b/.github/workflows/pr-dependabot.yaml index b51cab0d8c..10162e9129 100644 --- a/.github/workflows/pr-dependabot.yaml +++ b/.github/workflows/pr-dependabot.yaml @@ -24,7 +24,7 @@ jobs: id: vars run: echo "go_version=$(make go-version)" >> $GITHUB_OUTPUT - name: Set up Go - uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # tag=v5.5.0 + uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # tag=v6.0.0 with: go-version: ${{ steps.vars.outputs.go_version }} - name: Update all modules diff --git a/.github/workflows/pr-gh-workflow-approve.yaml b/.github/workflows/pr-gh-workflow-approve.yaml index f493fd4003..28be4dac71 100644 --- a/.github/workflows/pr-gh-workflow-approve.yaml +++ b/.github/workflows/pr-gh-workflow-approve.yaml @@ -19,7 +19,7 @@ jobs: actions: write steps: - name: Update PR - uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1 + uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0 continue-on-error: true with: github-token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 353a0a1c5c..6e3b7734e7 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -22,14 +22,14 @@ jobs: id: vars run: echo "go_version=$(make go-version)" >> $GITHUB_OUTPUT - name: Set up Go - uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # tag=v5.5.0 + uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # tag=v6.0.0 with: go-version: ${{ steps.vars.outputs.go_version }} - name: Generate release binaries run: | make release - name: Release - uses: softprops/action-gh-release@72f2c25fcb47643c292f7107632f7a47c1df5cd8 # tag=v2.3.2 + uses: softprops/action-gh-release@62c96d0c4e8a889135c1f3a25910db8dbe0e85f7 # tag=v2.3.4 with: draft: false files: tools/setup-envtest/out/* diff --git a/.golangci.yml b/.golangci.yml index 1741432a01..85701c88a8 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -22,10 +22,12 @@ linters: - goconst - gocritic - gocyclo + - godoclint - goprintffuncname - govet - importas - ineffassign + - iotamixing - makezero - misspell - nakedret diff --git a/examples/crd/pkg/groupversion_info.go b/examples/crd/pkg/groupversion_info.go index 31dfbbc779..693d255b05 100644 --- a/examples/crd/pkg/groupversion_info.go +++ b/examples/crd/pkg/groupversion_info.go @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package pkg contains API Schema definitions for the chaosapps v1 API group // +kubebuilder:object:generate=true // +groupName=chaosapps.metamagical.io package pkg diff --git a/examples/priorityqueue/main.go b/examples/priorityqueue/main.go index 8dacdcc9a3..1dc10c2cbe 100644 --- a/examples/priorityqueue/main.go +++ b/examples/priorityqueue/main.go @@ -24,7 +24,6 @@ import ( "go.uber.org/zap/zapcore" corev1 "k8s.io/api/core/v1" - "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/builder" kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/config" @@ -52,7 +51,7 @@ func run() error { // Setup a Manager mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{ - Controller: config.Controller{UsePriorityQueue: ptr.To(true)}, + Controller: config.Controller{}, }) if err != nil { return fmt.Errorf("failed to set up controller-manager: %w", err) diff --git a/examples/scratch-env/go.mod b/examples/scratch-env/go.mod index a92a25b7d8..546c7c39ee 100644 --- a/examples/scratch-env/go.mod +++ b/examples/scratch-env/go.mod @@ -54,10 +54,10 @@ require ( gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.34.0 // indirect - k8s.io/apiextensions-apiserver v0.34.0 // indirect - k8s.io/apimachinery v0.34.0 // indirect - k8s.io/client-go v0.34.0 // indirect + k8s.io/api v0.34.1 // indirect + k8s.io/apiextensions-apiserver v0.34.1 // indirect + k8s.io/apimachinery v0.34.1 // indirect + k8s.io/client-go v0.34.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect diff --git a/examples/scratch-env/go.sum b/examples/scratch-env/go.sum index 703b352e28..012b88f447 100644 --- a/examples/scratch-env/go.sum +++ b/examples/scratch-env/go.sum @@ -173,14 +173,14 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.34.0 h1:L+JtP2wDbEYPUeNGbeSa/5GwFtIA662EmT2YSLOkAVE= -k8s.io/api v0.34.0/go.mod h1:YzgkIzOOlhl9uwWCZNqpw6RJy9L2FK4dlJeayUoydug= -k8s.io/apiextensions-apiserver v0.34.0 h1:B3hiB32jV7BcyKcMU5fDaDxk882YrJ1KU+ZSkA9Qxoc= -k8s.io/apiextensions-apiserver v0.34.0/go.mod h1:hLI4GxE1BDBy9adJKxUxCEHBGZtGfIg98Q+JmTD7+g0= -k8s.io/apimachinery v0.34.0 h1:eR1WO5fo0HyoQZt1wdISpFDffnWOvFLOOeJ7MgIv4z0= -k8s.io/apimachinery v0.34.0/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= -k8s.io/client-go v0.34.0 h1:YoWv5r7bsBfb0Hs2jh8SOvFbKzzxyNo0nSb0zC19KZo= -k8s.io/client-go v0.34.0/go.mod h1:ozgMnEKXkRjeMvBZdV1AijMHLTh3pbACPvK7zFR+QQY= +k8s.io/api v0.34.1 h1:jC+153630BMdlFukegoEL8E/yT7aLyQkIVuwhmwDgJM= +k8s.io/api v0.34.1/go.mod h1:SB80FxFtXn5/gwzCoN6QCtPD7Vbu5w2n1S0J5gFfTYk= +k8s.io/apiextensions-apiserver v0.34.1 h1:NNPBva8FNAPt1iSVwIE0FsdrVriRXMsaWFMqJbII2CI= +k8s.io/apiextensions-apiserver v0.34.1/go.mod h1:hP9Rld3zF5Ay2Of3BeEpLAToP+l4s5UlxiHfqRaRcMc= +k8s.io/apimachinery v0.34.1 h1:dTlxFls/eikpJxmAC7MVE8oOeP1zryV7iRyIjB0gky4= +k8s.io/apimachinery v0.34.1/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/client-go v0.34.1 h1:ZUPJKgXsnKwVwmKKdPfw4tB58+7/Ik3CrjOEhsiZ7mY= +k8s.io/client-go v0.34.1/go.mod h1:kA8v0FP+tk6sZA0yKLRG67LWjqufAoSHA2xVGKw9Of8= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA= diff --git a/examples/tokenreview/tokenreview.go b/examples/tokenreview/tokenreview.go index cc64545e16..16e4151077 100644 --- a/examples/tokenreview/tokenreview.go +++ b/examples/tokenreview/tokenreview.go @@ -28,7 +28,7 @@ import ( type authenticator struct { } -// authenticator admits a request by the token. +// Handle admits a request by the token. func (a *authenticator) Handle(ctx context.Context, req authentication.Request) authentication.Response { if req.Spec.Token == "invalid" { return authentication.Unauthenticated("invalid is an invalid token", v1.UserInfo{}) diff --git a/go.mod b/go.mod index 36bce9c9e5..4d998fe2fc 100644 --- a/go.mod +++ b/go.mod @@ -21,11 +21,11 @@ require ( golang.org/x/sys v0.31.0 gomodules.xyz/jsonpatch/v2 v2.4.0 gopkg.in/evanphx/json-patch.v4 v4.12.0 // Using v4 to match upstream - k8s.io/api v0.34.0 - k8s.io/apiextensions-apiserver v0.34.0 - k8s.io/apimachinery v0.34.0 - k8s.io/apiserver v0.34.0 - k8s.io/client-go v0.34.0 + k8s.io/api v0.34.1 + k8s.io/apiextensions-apiserver v0.34.1 + k8s.io/apimachinery v0.34.1 + k8s.io/apiserver v0.34.1 + k8s.io/client-go v0.34.1 k8s.io/klog/v2 v2.130.1 k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 sigs.k8s.io/structured-merge-diff/v6 v6.3.0 @@ -95,7 +95,7 @@ require ( google.golang.org/protobuf v1.36.5 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/component-base v0.34.0 // indirect + k8s.io/component-base v0.34.1 // indirect k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect diff --git a/go.sum b/go.sum index 102a137d04..d6278d8a7d 100644 --- a/go.sum +++ b/go.sum @@ -229,18 +229,18 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.34.0 h1:L+JtP2wDbEYPUeNGbeSa/5GwFtIA662EmT2YSLOkAVE= -k8s.io/api v0.34.0/go.mod h1:YzgkIzOOlhl9uwWCZNqpw6RJy9L2FK4dlJeayUoydug= -k8s.io/apiextensions-apiserver v0.34.0 h1:B3hiB32jV7BcyKcMU5fDaDxk882YrJ1KU+ZSkA9Qxoc= -k8s.io/apiextensions-apiserver v0.34.0/go.mod h1:hLI4GxE1BDBy9adJKxUxCEHBGZtGfIg98Q+JmTD7+g0= -k8s.io/apimachinery v0.34.0 h1:eR1WO5fo0HyoQZt1wdISpFDffnWOvFLOOeJ7MgIv4z0= -k8s.io/apimachinery v0.34.0/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= -k8s.io/apiserver v0.34.0 h1:Z51fw1iGMqN7uJ1kEaynf2Aec1Y774PqU+FVWCFV3Jg= -k8s.io/apiserver v0.34.0/go.mod h1:52ti5YhxAvewmmpVRqlASvaqxt0gKJxvCeW7ZrwgazQ= -k8s.io/client-go v0.34.0 h1:YoWv5r7bsBfb0Hs2jh8SOvFbKzzxyNo0nSb0zC19KZo= -k8s.io/client-go v0.34.0/go.mod h1:ozgMnEKXkRjeMvBZdV1AijMHLTh3pbACPvK7zFR+QQY= -k8s.io/component-base v0.34.0 h1:bS8Ua3zlJzapklsB1dZgjEJuJEeHjj8yTu1gxE2zQX8= -k8s.io/component-base v0.34.0/go.mod h1:RSCqUdvIjjrEm81epPcjQ/DS+49fADvGSCkIP3IC6vg= +k8s.io/api v0.34.1 h1:jC+153630BMdlFukegoEL8E/yT7aLyQkIVuwhmwDgJM= +k8s.io/api v0.34.1/go.mod h1:SB80FxFtXn5/gwzCoN6QCtPD7Vbu5w2n1S0J5gFfTYk= +k8s.io/apiextensions-apiserver v0.34.1 h1:NNPBva8FNAPt1iSVwIE0FsdrVriRXMsaWFMqJbII2CI= +k8s.io/apiextensions-apiserver v0.34.1/go.mod h1:hP9Rld3zF5Ay2Of3BeEpLAToP+l4s5UlxiHfqRaRcMc= +k8s.io/apimachinery v0.34.1 h1:dTlxFls/eikpJxmAC7MVE8oOeP1zryV7iRyIjB0gky4= +k8s.io/apimachinery v0.34.1/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/apiserver v0.34.1 h1:U3JBGdgANK3dfFcyknWde1G6X1F4bg7PXuvlqt8lITA= +k8s.io/apiserver v0.34.1/go.mod h1:eOOc9nrVqlBI1AFCvVzsob0OxtPZUCPiUJL45JOTBG0= +k8s.io/client-go v0.34.1 h1:ZUPJKgXsnKwVwmKKdPfw4tB58+7/Ik3CrjOEhsiZ7mY= +k8s.io/client-go v0.34.1/go.mod h1:kA8v0FP+tk6sZA0yKLRG67LWjqufAoSHA2xVGKw9Of8= +k8s.io/component-base v0.34.1 h1:v7xFgG+ONhytZNFpIz5/kecwD+sUhVE6HU7qQUiRM4A= +k8s.io/component-base v0.34.1/go.mod h1:mknCpLlTSKHzAQJJnnHVKqjxR7gBeHRv0rPXA7gdtQ0= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA= diff --git a/pkg/builder/webhook.go b/pkg/builder/webhook.go index 6263f030a0..6f4726d274 100644 --- a/pkg/builder/webhook.go +++ b/pkg/builder/webhook.go @@ -17,6 +17,7 @@ limitations under the License. package builder import ( + "context" "errors" "net/http" "net/url" @@ -49,6 +50,7 @@ type WebhookBuilder struct { config *rest.Config recoverPanic *bool logConstructor func(base logr.Logger, req *admission.Request) logr.Logger + contextFunc func(context.Context, *http.Request) context.Context err error } @@ -90,6 +92,12 @@ func (blder *WebhookBuilder) WithLogConstructor(logConstructor func(base logr.Lo return blder } +// WithContextFunc overrides the webhook's WithContextFunc. +func (blder *WebhookBuilder) WithContextFunc(contextFunc func(context.Context, *http.Request) context.Context) *WebhookBuilder { + blder.contextFunc = contextFunc + return blder +} + // RecoverPanic indicates whether panics caused by the webhook should be recovered. // Defaults to true. func (blder *WebhookBuilder) RecoverPanic(recoverPanic bool) *WebhookBuilder { @@ -205,6 +213,7 @@ func (blder *WebhookBuilder) registerDefaultingWebhook() error { mwh := blder.getDefaultingWebhook() if mwh != nil { mwh.LogConstructor = blder.logConstructor + mwh.WithContextFunc = blder.contextFunc path := generateMutatePath(blder.gvk) if blder.customDefaulterCustomPath != "" { generatedCustomPath, err := generateCustomPath(blder.customDefaulterCustomPath) @@ -243,6 +252,7 @@ func (blder *WebhookBuilder) registerValidatingWebhook() error { vwh := blder.getValidatingWebhook() if vwh != nil { vwh.LogConstructor = blder.logConstructor + vwh.WithContextFunc = blder.contextFunc path := generateValidatePath(blder.gvk) if blder.customValidatorCustomPath != "" { generatedCustomPath, err := generateCustomPath(blder.customValidatorCustomPath) diff --git a/pkg/builder/webhook_test.go b/pkg/builder/webhook_test.go index eb70af2e0a..72538ef7bf 100644 --- a/pkg/builder/webhook_test.go +++ b/pkg/builder/webhook_test.go @@ -49,8 +49,14 @@ const ( svcBaseAddr = "http://svc-name.svc-ns.svc" customPath = "/custom-path" + + userAgentHeader = "User-Agent" + userAgentCtxKey agentCtxKey = "UserAgent" + userAgentValue = "test" ) +type agentCtxKey string + var _ = Describe("webhook", func() { Describe("New", func() { Context("v1 AdmissionReview", func() { @@ -315,6 +321,9 @@ func runTests(admissionReviewVersion string) { WithLogConstructor(func(base logr.Logger, req *admission.Request) logr.Logger { return admission.DefaultLogConstructor(testingLogger, req) }). + WithContextFunc(func(ctx context.Context, request *http.Request) context.Context { + return context.WithValue(ctx, userAgentCtxKey, request.Header.Get(userAgentHeader)) + }). Complete() ExpectWithOffset(1, err).NotTo(HaveOccurred()) svr := m.GetWebhookServer() @@ -344,6 +353,30 @@ func runTests(admissionReviewVersion string) { } } }`) + readerWithCxt := strings.NewReader(admissionReviewGV + admissionReviewVersion + `", + "request":{ + "uid":"07e52e8d-4513-11e9-a716-42010a800270", + "kind":{ + "group":"foo.test.org", + "version":"v1", + "kind":"TestValidator" + }, + "resource":{ + "group":"foo.test.org", + "version":"v1", + "resource":"testvalidator" + }, + "namespace":"default", + "name":"foo", + "operation":"UPDATE", + "object":{ + "replica":1 + }, + "oldObject":{ + "replica":1 + } + } +}`) ctx, cancel := context.WithCancel(specCtx) cancel() @@ -373,6 +406,20 @@ func runTests(admissionReviewVersion string) { ExpectWithOffset(1, w.Body).To(ContainSubstring(`"allowed":false`)) ExpectWithOffset(1, w.Body).To(ContainSubstring(`"code":403`)) EventuallyWithOffset(1, logBuffer).Should(gbytes.Say(`"msg":"Validating object","object":{"name":"foo","namespace":"default"},"namespace":"default","name":"foo","resource":{"group":"foo.test.org","version":"v1","resource":"testvalidator"},"user":"","requestID":"07e52e8d-4513-11e9-a716-42010a800270"`)) + + By("sending a request to a validating webhook with context header validation") + path = generateValidatePath(testValidatorGVK) + _, err = readerWithCxt.Seek(0, 0) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + req = httptest.NewRequest("POST", svcBaseAddr+path, readerWithCxt) + req.Header.Add("Content-Type", "application/json") + req.Header.Add(userAgentHeader, userAgentValue) + w = httptest.NewRecorder() + svr.WebhookMux().ServeHTTP(w, req) + ExpectWithOffset(1, w.Code).To(Equal(http.StatusOK)) + By("sanity checking the response contains reasonable field") + ExpectWithOffset(1, w.Body).To(ContainSubstring(`"allowed":true`)) + ExpectWithOffset(1, w.Body).To(ContainSubstring(`"code":200`)) }) It("should scaffold a custom validating webhook with a custom path", func(specCtx SpecContext) { @@ -1009,6 +1056,7 @@ func (*TestCustomDefaulter) Default(ctx context.Context, obj runtime.Object) err if d.Replica < 2 { d.Replica = 2 } + return nil } @@ -1035,6 +1083,7 @@ func (*TestCustomValidator) ValidateCreate(ctx context.Context, obj runtime.Obje if v.Replica < 0 { return nil, errors.New("number of replica should be greater than or equal to 0") } + return nil, nil } @@ -1056,6 +1105,12 @@ func (*TestCustomValidator) ValidateUpdate(ctx context.Context, oldObj, newObj r if v.Replica < old.Replica { return nil, fmt.Errorf("new replica %v should not be fewer than old replica %v", v.Replica, old.Replica) } + + userAgent, ok := ctx.Value(userAgentCtxKey).(string) + if ok && userAgent != userAgentValue { + return nil, fmt.Errorf("expected %s value is %q in TestCustomValidator got %q", userAgentCtxKey, userAgentValue, userAgent) + } + return nil, nil } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 2364eec3e1..7748e2e317 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -2568,7 +2568,6 @@ func ensureNode(ctx context.Context, name string, client client.Client) error { return err } -//nolint:interfacer func isKubeService(svc metav1.Object) bool { // grumble grumble linters grumble grumble return svc.GetNamespace() == "default" && svc.GetName() == "kubernetes" diff --git a/pkg/client/client.go b/pkg/client/client.go index 092deb43d4..39050de457 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -151,8 +151,7 @@ func newClient(config *rest.Config, options Options) (*client, error) { mapper: options.Mapper, codecs: serializer.NewCodecFactory(options.Scheme), - structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), - unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), + resourceByType: make(map[cacheKey]*resourceMeta), } rawMetaClient, err := metadata.NewForConfigAndClient(metadata.ConfigFor(config), options.HTTPClient) @@ -544,6 +543,30 @@ func (po *SubResourcePatchOptions) ApplyToSubResourcePatch(o *SubResourcePatchOp } } +// SubResourceApplyOptions are the options for a subresource +// apply request. +type SubResourceApplyOptions struct { + ApplyOptions + SubResourceBody runtime.ApplyConfiguration +} + +// ApplyOpts applies the given options. +func (ao *SubResourceApplyOptions) ApplyOpts(opts []SubResourceApplyOption) *SubResourceApplyOptions { + for _, o := range opts { + o.ApplyToSubResourceApply(ao) + } + + return ao +} + +// ApplyToSubResourceApply applies the configuration on the given patch options. +func (ao *SubResourceApplyOptions) ApplyToSubResourceApply(o *SubResourceApplyOptions) { + ao.ApplyOptions.ApplyToApply(&o.ApplyOptions) + if ao.SubResourceBody != nil { + o.SubResourceBody = ao.SubResourceBody + } +} + func (sc *subResourceClient) Get(ctx context.Context, obj Object, subResource Object, opts ...SubResourceGetOption) error { switch obj.(type) { case runtime.Unstructured: @@ -595,3 +618,13 @@ func (sc *subResourceClient) Patch(ctx context.Context, obj Object, patch Patch, return sc.client.typedClient.PatchSubResource(ctx, obj, sc.subResource, patch, opts...) } } + +func (sc *subResourceClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...SubResourceApplyOption) error { + switch obj := obj.(type) { + case *unstructuredApplyConfiguration: + defer sc.client.resetGroupVersionKind(obj, obj.GetObjectKind().GroupVersionKind()) + return sc.client.unstructuredClient.ApplySubResource(ctx, obj, sc.subResource, opts...) + default: + return sc.client.typedClient.ApplySubResource(ctx, obj, sc.subResource, opts...) + } +} diff --git a/pkg/client/client_rest_resources.go b/pkg/client/client_rest_resources.go index acff7a46a4..d75d685cbb 100644 --- a/pkg/client/client_rest_resources.go +++ b/pkg/client/client_rest_resources.go @@ -48,11 +48,15 @@ type clientRestResources struct { // codecs are used to create a REST client for a gvk codecs serializer.CodecFactory - // structuredResourceByType stores structured type metadata - structuredResourceByType map[schema.GroupVersionKind]*resourceMeta - // unstructuredResourceByType stores unstructured type metadata - unstructuredResourceByType map[schema.GroupVersionKind]*resourceMeta - mu sync.RWMutex + // resourceByType stores type metadata + resourceByType map[cacheKey]*resourceMeta + + mu sync.RWMutex +} + +type cacheKey struct { + gvk schema.GroupVersionKind + forceDisableProtoBuf bool } // newResource maps obj to a Kubernetes Resource and constructs a client for that Resource. @@ -117,11 +121,11 @@ func (c *clientRestResources) getResource(obj any) (*resourceMeta, error) { // It's better to do creation work twice than to not let multiple // people make requests at once c.mu.RLock() - resourceByType := c.structuredResourceByType - if isUnstructured { - resourceByType = c.unstructuredResourceByType - } - r, known := resourceByType[gvk] + + cacheKey := cacheKey{gvk: gvk, forceDisableProtoBuf: forceDisableProtoBuf} + + r, known := c.resourceByType[cacheKey] + c.mu.RUnlock() if known { @@ -140,7 +144,7 @@ func (c *clientRestResources) getResource(obj any) (*resourceMeta, error) { if err != nil { return nil, err } - resourceByType[gvk] = r + c.resourceByType[cacheKey] = r return r, err } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index c775f28718..021fbeb0d8 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -43,6 +43,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + appsv1applyconfigurations "k8s.io/client-go/applyconfigurations/apps/v1" + autoscaling1applyconfigurations "k8s.io/client-go/applyconfigurations/autoscaling/v1" corev1applyconfigurations "k8s.io/client-go/applyconfigurations/core/v1" kscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -951,6 +953,52 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC Expect(cm.Data).To(BeComparableTo(data)) Expect(cm.Data).To(BeComparableTo(obj.Data)) }) + + It("should create a secret without SSA and later create update a secret using SSA", func(ctx SpecContext) { + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + data := map[string][]byte{ + "some-key": []byte("some-value"), + } + secretObject := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "secret-one", + Namespace: "default", + }, + Data: data, + } + + secretApplyConfiguration := corev1applyconfigurations. + Secret("secret-two", "default"). + WithData(data) + + err = cl.Create(ctx, secretObject) + Expect(err).NotTo(HaveOccurred()) + + err = cl.Apply(ctx, secretApplyConfiguration, &client.ApplyOptions{FieldManager: "test-manager"}) + Expect(err).NotTo(HaveOccurred()) + + secret, err := clientset.CoreV1().Secrets(ptr.Deref(secretApplyConfiguration.GetNamespace(), "")).Get(ctx, ptr.Deref(secretApplyConfiguration.GetName(), ""), metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + Expect(secret.Data).To(BeComparableTo(data)) + Expect(secret.Data).To(BeComparableTo(secretApplyConfiguration.Data)) + + data = map[string][]byte{ + "some-key": []byte("some-new-value"), + } + secretApplyConfiguration.Data = data + + err = cl.Apply(ctx, secretApplyConfiguration, &client.ApplyOptions{FieldManager: "test-manager"}) + Expect(err).NotTo(HaveOccurred()) + + secret, err = clientset.CoreV1().Secrets(ptr.Deref(secretApplyConfiguration.GetNamespace(), "")).Get(ctx, ptr.Deref(secretApplyConfiguration.GetName(), ""), metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + Expect(secret.Data).To(BeComparableTo(data)) + Expect(secret.Data).To(BeComparableTo(secretApplyConfiguration.Data)) + }) }) }) @@ -1127,6 +1175,34 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC Expect(err).NotTo(HaveOccurred()) Expect(*dep.Spec.Replicas).To(Equal(replicaCount)) }) + + It("should be able to apply the scale subresource", func(ctx SpecContext) { + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + + By("Creating a deployment") + dep, err := clientset.AppsV1().Deployments(dep.Namespace).Create(ctx, dep, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + replicaCount := *dep.Spec.Replicas + 1 + + By("Applying the scale subresurce") + deploymentAC, err := appsv1applyconfigurations.ExtractDeployment(dep, "foo") + Expect(err).NotTo(HaveOccurred()) + scale := autoscaling1applyconfigurations.Scale(). + WithSpec(autoscaling1applyconfigurations.ScaleSpec().WithReplicas(replicaCount)) + err = cl.SubResource("scale").Apply(ctx, deploymentAC, + &client.SubResourceApplyOptions{SubResourceBody: scale}, + client.FieldOwner("foo"), + client.ForceOwnership, + ) + Expect(err).NotTo(HaveOccurred()) + + By("Asserting replicas got updated") + dep, err = clientset.AppsV1().Deployments(dep.Namespace).Get(ctx, dep.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(*dep.Spec.Replicas).To(Equal(replicaCount)) + }) }) Context("with unstructured objects", func() { @@ -1322,8 +1398,8 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC By("Creating a deployment") dep, err := clientset.AppsV1().Deployments(dep.Namespace).Create(ctx, dep, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) - dep.APIVersion = "apps/v1" - dep.Kind = "Deployment" + dep.APIVersion = appsv1.SchemeGroupVersion.String() + dep.Kind = "Deployment" //nolint:goconst depUnstructured, err := toUnstructured(dep) Expect(err).NotTo(HaveOccurred()) @@ -1374,6 +1450,41 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC Expect(err).NotTo(HaveOccurred()) Expect(*dep.Spec.Replicas).To(Equal(replicaCount)) }) + + It("should be able to apply the scale subresource", func(ctx SpecContext) { + cl, err := client.New(cfg, client.Options{Scheme: runtime.NewScheme()}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + + By("Creating a deployment") + dep, err := clientset.AppsV1().Deployments(dep.Namespace).Create(ctx, dep, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + dep.APIVersion = "apps/v1" + dep.Kind = "Deployment" + depUnstructured, err := toUnstructured(dep) + Expect(err).NotTo(HaveOccurred()) + + By("Updating the scale subresurce") + replicaCount := *dep.Spec.Replicas + 1 + scale := &unstructured.Unstructured{} + scale.SetAPIVersion("autoscaling/v1") + scale.SetKind("Scale") + Expect(unstructured.SetNestedField(scale.Object, int64(replicaCount), "spec", "replicas")).NotTo(HaveOccurred()) + err = cl.SubResource("scale").Apply(ctx, + client.ApplyConfigurationFromUnstructured(depUnstructured), + &client.SubResourceApplyOptions{SubResourceBody: client.ApplyConfigurationFromUnstructured(scale)}, + client.FieldOwner("foo"), + client.ForceOwnership, + ) + Expect(err).NotTo(HaveOccurred()) + Expect(scale.GetAPIVersion()).To(Equal("autoscaling/v1")) + Expect(scale.GetKind()).To(Equal("Scale")) + + By("Asserting replicas got updated") + dep, err = clientset.AppsV1().Deployments(dep.Namespace).Get(ctx, dep.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(*dep.Spec.Replicas).To(Equal(replicaCount)) + }) }) }) @@ -1440,6 +1551,29 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC Expect(dep.GroupVersionKind()).To(Equal(depGvk)) }) + It("should apply status", func(ctx SpecContext) { + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + + By("initially creating a Deployment") + dep, err := clientset.AppsV1().Deployments(ns).Create(ctx, dep, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(dep.Status.Replicas).To(BeEquivalentTo(0)) + + By("applying the status of Deployment") + deploymentAC, err := appsv1applyconfigurations.ExtractDeployment(dep, "foo") + Expect(err).NotTo(HaveOccurred()) + deploymentAC.WithStatus(&appsv1applyconfigurations.DeploymentStatusApplyConfiguration{ + Replicas: ptr.To(int32(1)), + }) + Expect(cl.Status().Apply(ctx, deploymentAC, client.FieldOwner("foo"))).To(Succeed()) + + dep, err = clientset.AppsV1().Deployments(ns).Get(ctx, dep.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(dep.Status.Replicas).To(BeEquivalentTo(1)) + }) + It("should not update spec of an existing object", func(ctx SpecContext) { cl, err := client.New(cfg, client.Options{}) Expect(err).NotTo(HaveOccurred()) @@ -1592,6 +1726,34 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC Expect(actual.Status.Replicas).To(BeEquivalentTo(1)) }) + It("should apply status and preserve type information", func(ctx SpecContext) { + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + + By("initially creating a Deployment") + dep, err := clientset.AppsV1().Deployments(ns).Create(ctx, dep, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(dep.Status.Replicas).To(BeEquivalentTo(0)) + + By("applying the status of Deployment") + dep.Status.Replicas = 1 + dep.ManagedFields = nil // Must be unset in SSA requests + u := &unstructured.Unstructured{} + Expect(scheme.Convert(dep, u, nil)).To(Succeed()) + err = cl.Status().Apply(ctx, client.ApplyConfigurationFromUnstructured(u), client.FieldOwner("foo")) + Expect(err).NotTo(HaveOccurred()) + + By("validating updated Deployment has type information") + Expect(u.GroupVersionKind()).To(Equal(depGvk)) + + By("validating patched Deployment has new status") + actual, err := clientset.AppsV1().Deployments(ns).Get(ctx, dep.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(actual).NotTo(BeNil()) + Expect(actual.Status.Replicas).To(BeEquivalentTo(1)) + }) + It("should not update spec of an existing object", func(ctx SpecContext) { cl, err := client.New(cfg, client.Options{}) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/client/config/config.go b/pkg/client/config/config.go index 70389dfa90..1c39f4d854 100644 --- a/pkg/client/config/config.go +++ b/pkg/client/config/config.go @@ -64,9 +64,6 @@ func RegisterFlags(fs *flag.FlagSet) { // The returned `*rest.Config` has client-side ratelimting disabled as we can rely on API priority and // fairness. Set its QPS to a value equal or bigger than 0 to re-enable it. // -// It also applies saner defaults for QPS and burst based on the Kubernetes -// controller manager defaults (20 QPS, 30 burst) -// // Config precedence: // // * --kubeconfig flag pointing at a file @@ -87,9 +84,6 @@ func GetConfig() (*rest.Config, error) { // The returned `*rest.Config` has client-side ratelimting disabled as we can rely on API priority and // fairness. Set its QPS to a value equal or bigger than 0 to re-enable it. // -// It also applies saner defaults for QPS and burst based on the Kubernetes -// controller manager defaults (20 QPS, 30 burst) -// // Config precedence: // // * --kubeconfig flag pointing at a file diff --git a/pkg/client/dryrun.go b/pkg/client/dryrun.go index a185860d33..fb7012200f 100644 --- a/pkg/client/dryrun.go +++ b/pkg/client/dryrun.go @@ -132,3 +132,7 @@ func (sw *dryRunSubResourceClient) Update(ctx context.Context, obj Object, opts func (sw *dryRunSubResourceClient) Patch(ctx context.Context, obj Object, patch Patch, opts ...SubResourcePatchOption) error { return sw.client.Patch(ctx, obj, patch, append(opts, DryRunAll)...) } + +func (sw *dryRunSubResourceClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...SubResourceApplyOption) error { + return sw.client.Apply(ctx, obj, append(opts, DryRunAll)...) +} diff --git a/pkg/client/dryrun_test.go b/pkg/client/dryrun_test.go index 912a4a10dc..35a9b63869 100644 --- a/pkg/client/dryrun_test.go +++ b/pkg/client/dryrun_test.go @@ -27,6 +27,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + appsv1applyconfigurations "k8s.io/client-go/applyconfigurations/apps/v1" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -260,4 +261,36 @@ var _ = Describe("DryRunClient", func() { Expect(actual).NotTo(BeNil()) Expect(actual).To(BeEquivalentTo(dep)) }) + + It("should not change objects via status apply", func(ctx SpecContext) { + deploymentAC, err := appsv1applyconfigurations.ExtractDeployment(dep, "test-owner") + Expect(err).NotTo(HaveOccurred()) + deploymentAC.WithStatus(&appsv1applyconfigurations.DeploymentStatusApplyConfiguration{ + Replicas: ptr.To(int32(99)), + }) + + Expect(getClient().Status().Apply(ctx, deploymentAC, client.FieldOwner("test-owner"))).NotTo(HaveOccurred()) + + actual, err := clientset.AppsV1().Deployments(ns).Get(ctx, dep.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(actual).NotTo(BeNil()) + Expect(actual).To(BeEquivalentTo(dep)) + }) + + It("should not change objects via status apply with opts", func(ctx SpecContext) { + deploymentAC, err := appsv1applyconfigurations.ExtractDeployment(dep, "test-owner") + Expect(err).NotTo(HaveOccurred()) + deploymentAC.WithStatus(&appsv1applyconfigurations.DeploymentStatusApplyConfiguration{ + Replicas: ptr.To(int32(99)), + }) + + opts := &client.SubResourceApplyOptions{ApplyOptions: client.ApplyOptions{DryRun: []string{"Bye", "Pippa"}}} + + Expect(getClient().Status().Apply(ctx, deploymentAC, client.FieldOwner("test-owner"), opts)).NotTo(HaveOccurred()) + + actual, err := clientset.AppsV1().Deployments(ns).Get(ctx, dep.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(actual).NotTo(BeNil()) + Expect(actual).To(BeEquivalentTo(dep)) + }) }) diff --git a/pkg/client/fake/client.go b/pkg/client/fake/client.go index 45f9e00e18..62067cb19c 100644 --- a/pkg/client/fake/client.go +++ b/pkg/client/fake/client.go @@ -17,13 +17,10 @@ limitations under the License. package fake import ( - "bytes" "context" "errors" "fmt" "reflect" - "runtime/debug" - "strconv" "strings" "sync" "time" @@ -65,7 +62,6 @@ import ( utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/watch" clientgoapplyconfigurations "k8s.io/client-go/applyconfigurations" "k8s.io/client-go/kubernetes/scheme" @@ -79,13 +75,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/internal/objectutil" ) -type versionedTracker struct { - testing.ObjectTracker - scheme *runtime.Scheme - withStatusSubresource sets.Set[schema.GroupVersionKind] - usesFieldManagedObjectTracker bool -} - type fakeClient struct { // trackerWriteLock must be acquired before writing to // the tracker or performing reads that affect a following @@ -141,6 +130,7 @@ type ClientBuilder struct { interceptorFuncs *interceptor.Funcs typeConverters []managedfields.TypeConverter returnManagedFields bool + isBuilt bool // indexes maps each GroupVersionKind (GVK) to the indexes registered for that GVK. // The inner map maps from index name to IndexerFunc. @@ -267,6 +257,9 @@ func (f *ClientBuilder) WithReturnManagedFields() *ClientBuilder { // Build builds and returns a new fake client. func (f *ClientBuilder) Build() client.WithWatch { + if f.isBuilt { + panic("Build() must not be called multiple times when creating a ClientBuilder") + } if f.scheme == nil { f.scheme = scheme.Scheme } @@ -309,7 +302,7 @@ func (f *ClientBuilder) Build() client.WithWatch { usesFieldManagedObjectTracker = true } tracker := versionedTracker{ - ObjectTracker: f.objectTracker, + upstream: f.objectTracker, scheme: f.scheme, withStatusSubresource: withStatusSubResource, usesFieldManagedObjectTracker: usesFieldManagedObjectTracker, @@ -344,88 +337,12 @@ func (f *ClientBuilder) Build() client.WithWatch { result = interceptor.NewClient(result, *f.interceptorFuncs) } + f.isBuilt = true return result } const trackerAddResourceVersion = "999" -func (t versionedTracker) Add(obj runtime.Object) error { - var objects []runtime.Object - if meta.IsListType(obj) { - var err error - objects, err = meta.ExtractList(obj) - if err != nil { - return err - } - } else { - objects = []runtime.Object{obj} - } - for _, obj := range objects { - accessor, err := meta.Accessor(obj) - if err != nil { - return fmt.Errorf("failed to get accessor for object: %w", err) - } - if accessor.GetDeletionTimestamp() != nil && len(accessor.GetFinalizers()) == 0 { - return fmt.Errorf("refusing to create obj %s with metadata.deletionTimestamp but no finalizers", accessor.GetName()) - } - if accessor.GetResourceVersion() == "" { - // We use a "magic" value of 999 here because this field - // is parsed as uint and and 0 is already used in Update. - // As we can't go lower, go very high instead so this can - // be recognized - accessor.SetResourceVersion(trackerAddResourceVersion) - } - - obj, err = convertFromUnstructuredIfNecessary(t.scheme, obj) - if err != nil { - return err - } - - // If the fieldManager can not decode fields, it will just silently clear them. This is pretty - // much guaranteed not to be what someone that initializes a fake client with objects that - // have them set wants, so validate them here. - // Ref https://github.com/kubernetes/kubernetes/blob/a956ef4862993b825bcd524a19260192ff1da72d/staging/src/k8s.io/apimachinery/pkg/util/managedfields/internal/fieldmanager.go#L105 - if t.usesFieldManagedObjectTracker { - if err := managedfields.ValidateManagedFields(accessor.GetManagedFields()); err != nil { - return fmt.Errorf("invalid managedFields on %T: %w", obj, err) - } - } - if err := t.ObjectTracker.Add(obj); err != nil { - return err - } - } - - return nil -} - -func (t versionedTracker) Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string, opts ...metav1.CreateOptions) error { - accessor, err := meta.Accessor(obj) - if err != nil { - return fmt.Errorf("failed to get accessor for object: %w", err) - } - if accessor.GetName() == "" { - gvk, _ := apiutil.GVKForObject(obj, t.scheme) - return apierrors.NewInvalid( - gvk.GroupKind(), - accessor.GetName(), - field.ErrorList{field.Required(field.NewPath("metadata.name"), "name is required")}) - } - if accessor.GetResourceVersion() != "" { - return apierrors.NewBadRequest("resourceVersion can not be set for Create requests") - } - accessor.SetResourceVersion("1") - obj, err = convertFromUnstructuredIfNecessary(t.scheme, obj) - if err != nil { - return err - } - if err := t.ObjectTracker.Create(gvr, obj, ns, opts...); err != nil { - accessor.SetResourceVersion("") - return err - } - - return nil -} - // convertFromUnstructuredIfNecessary will convert runtime.Unstructured for a GVK that is recognized // by the schema into the whatever the schema produces with New() for said GVK. // This is required because the tracker unconditionally saves on manipulations, but its List() implementation @@ -460,151 +377,6 @@ func convertFromUnstructuredIfNecessary(s *runtime.Scheme, o runtime.Object) (ru return typed, nil } -func (t versionedTracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string, opts ...metav1.UpdateOptions) error { - updateOpts, err := getSingleOrZeroOptions(opts) - if err != nil { - return err - } - - return t.update(gvr, obj, ns, false, false, updateOpts) -} - -func (t versionedTracker) update(gvr schema.GroupVersionResource, obj runtime.Object, ns string, isStatus, deleting bool, opts metav1.UpdateOptions) error { - gvk, err := apiutil.GVKForObject(obj, t.scheme) - if err != nil { - return err - } - obj, err = t.updateObject(gvr, obj, ns, isStatus, deleting, opts.DryRun) - if err != nil { - return err - } - if obj == nil { - return nil - } - - if u, unstructured := obj.(*unstructured.Unstructured); unstructured { - u.SetGroupVersionKind(gvk) - } - - return t.ObjectTracker.Update(gvr, obj, ns, opts) -} - -func (t versionedTracker) Patch(gvr schema.GroupVersionResource, obj runtime.Object, ns string, opts ...metav1.PatchOptions) error { - patchOptions, err := getSingleOrZeroOptions(opts) - if err != nil { - return err - } - - // We apply patches using a client-go reaction that ends up calling the trackers Patch. As we can't change - // that reaction, we use the callstack to figure out if this originated from the status client. - isStatus := bytes.Contains(debug.Stack(), []byte("sigs.k8s.io/controller-runtime/pkg/client/fake.(*fakeSubResourceClient).statusPatch")) - - obj, err = t.updateObject(gvr, obj, ns, isStatus, false, patchOptions.DryRun) - if err != nil { - return err - } - if obj == nil { - return nil - } - - return t.ObjectTracker.Patch(gvr, obj, ns, patchOptions) -} - -func (t versionedTracker) updateObject(gvr schema.GroupVersionResource, obj runtime.Object, ns string, isStatus, deleting bool, dryRun []string) (runtime.Object, error) { - accessor, err := meta.Accessor(obj) - if err != nil { - return nil, fmt.Errorf("failed to get accessor for object: %w", err) - } - - if accessor.GetName() == "" { - gvk, _ := apiutil.GVKForObject(obj, t.scheme) - return nil, apierrors.NewInvalid( - gvk.GroupKind(), - accessor.GetName(), - field.ErrorList{field.Required(field.NewPath("metadata.name"), "name is required")}) - } - - gvk, err := apiutil.GVKForObject(obj, t.scheme) - if err != nil { - return nil, err - } - - oldObject, err := t.ObjectTracker.Get(gvr, ns, accessor.GetName()) - if err != nil { - // If the resource is not found and the resource allows create on update, issue a - // create instead. - if apierrors.IsNotFound(err) && allowsCreateOnUpdate(gvk) { - return nil, t.Create(gvr, obj, ns) - } - return nil, err - } - - if t.withStatusSubresource.Has(gvk) { - if isStatus { // copy everything but status and metadata.ResourceVersion from original object - if err := copyStatusFrom(obj, oldObject); err != nil { - return nil, fmt.Errorf("failed to copy non-status field for object with status subresouce: %w", err) - } - passedRV := accessor.GetResourceVersion() - if err := copyFrom(oldObject, obj); err != nil { - return nil, fmt.Errorf("failed to restore non-status fields: %w", err) - } - accessor.SetResourceVersion(passedRV) - } else { // copy status from original object - if err := copyStatusFrom(oldObject, obj); err != nil { - return nil, fmt.Errorf("failed to copy the status for object with status subresource: %w", err) - } - } - } else if isStatus { - return nil, apierrors.NewNotFound(gvr.GroupResource(), accessor.GetName()) - } - - oldAccessor, err := meta.Accessor(oldObject) - if err != nil { - return nil, err - } - - // If the new object does not have the resource version set and it allows unconditional update, - // default it to the resource version of the existing resource - if accessor.GetResourceVersion() == "" { - switch { - case allowsUnconditionalUpdate(gvk): - accessor.SetResourceVersion(oldAccessor.GetResourceVersion()) - // This is needed because if the patch explicitly sets the RV to null, the client-go reaction we use - // to apply it and whose output we process here will have it unset. It is not clear why the Kubernetes - // apiserver accepts such a patch, but it does so we just copy that behavior. - // Kubernetes apiserver behavior can be checked like this: - // `kubectl patch configmap foo --patch '{"metadata":{"annotations":{"foo":"bar"},"resourceVersion":null}}' -v=9` - case bytes. - Contains(debug.Stack(), []byte("sigs.k8s.io/controller-runtime/pkg/client/fake.(*fakeClient).Patch")): - // We apply patches using a client-go reaction that ends up calling the trackers Update. As we can't change - // that reaction, we use the callstack to figure out if this originated from the "fakeClient.Patch" func. - accessor.SetResourceVersion(oldAccessor.GetResourceVersion()) - } - } - - if accessor.GetResourceVersion() != oldAccessor.GetResourceVersion() { - return nil, apierrors.NewConflict(gvr.GroupResource(), accessor.GetName(), errors.New("object was modified")) - } - if oldAccessor.GetResourceVersion() == "" { - oldAccessor.SetResourceVersion("0") - } - intResourceVersion, err := strconv.ParseUint(oldAccessor.GetResourceVersion(), 10, 64) - if err != nil { - return nil, fmt.Errorf("can not convert resourceVersion %q to int: %w", oldAccessor.GetResourceVersion(), err) - } - intResourceVersion++ - accessor.SetResourceVersion(strconv.FormatUint(intResourceVersion, 10)) - - if !deleting && !deletionTimestampEqual(accessor, oldAccessor) { - return nil, fmt.Errorf("error: Unable to edit %s: metadata.deletionTimestamp field is immutable", accessor.GetName()) - } - - if !accessor.GetDeletionTimestamp().IsZero() && len(accessor.GetFinalizers()) == 0 { - return nil, t.ObjectTracker.Delete(gvr, accessor.GetNamespace(), accessor.GetName(), metav1.DeleteOptions{DryRun: dryRun}) - } - return convertFromUnstructuredIfNecessary(t.scheme, obj) -} - func (c *fakeClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { if err := c.addToSchemeIfUnknownAndUnstructuredOrPartial(obj); err != nil { return err @@ -1171,9 +943,15 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client return err } - // SSA deletionTimestamp updates are silently ignored - if patch.Type() == types.ApplyPatchType && !isApplyCreate { - obj.SetDeletionTimestamp(oldAccessor.GetDeletionTimestamp()) + if patch.Type() == types.ApplyPatchType { + if isApplyCreate { + // Overwrite it unconditionally, this matches the apiserver behavior + // which allows to set it on create, but will then ignore it. + obj.SetResourceVersion("1") + } else { + // SSA deletionTimestamp updates are silently ignored + obj.SetDeletionTimestamp(oldAccessor.GetDeletionTimestamp()) + } } data, err := patch.Data(obj) @@ -1222,6 +1000,15 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client reaction := testing.ObjectReaction(c.tracker) handled, o, err := reaction(action) if err != nil { + // The reaction calls tracker.Get after tracker.Apply to return the object, + // but we may have deleted it in tracker.Apply if there was no finalizer + // left. + if apierrors.IsNotFound(err) && + patch.Type() == types.ApplyPatchType && + oldAccessor.GetDeletionTimestamp() != nil && + len(obj.GetFinalizers()) == 0 { + return nil + } return err } if !handled { @@ -1548,6 +1335,42 @@ func (sw *fakeSubResourceClient) statusPatch(body client.Object, patch client.Pa return sw.client.patch(body, patch, &patchOptions.PatchOptions) } +func (sw *fakeSubResourceClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...client.SubResourceApplyOption) error { + if sw.subResource != "status" { + return errors.New("fakeSubResourceClient currently only supports Apply for status subresource") + } + + applyOpts := &client.SubResourceApplyOptions{} + applyOpts.ApplyOpts(opts) + + data, err := json.Marshal(obj) + if err != nil { + return fmt.Errorf("failed to marshal apply configuration: %w", err) + } + + u := &unstructured.Unstructured{} + if err := json.Unmarshal(data, u); err != nil { + return fmt.Errorf("failed to unmarshal apply configuration: %w", err) + } + + patchOpts := &client.SubResourcePatchOptions{} + patchOpts.Raw = applyOpts.AsPatchOptions() + + if applyOpts.SubResourceBody != nil { + subResourceBodySerialized, err := json.Marshal(applyOpts.SubResourceBody) + if err != nil { + return fmt.Errorf("failed to serialize subresource body: %w", err) + } + subResourceBody := &unstructured.Unstructured{} + if err := json.Unmarshal(subResourceBodySerialized, subResourceBody); err != nil { + return fmt.Errorf("failed to unmarshal subresource body: %w", err) + } + patchOpts.SubResourceBody = subResourceBody + } + + return sw.Patch(ctx, u, &fakeApplyPatch{}, patchOpts) +} + func allowsUnconditionalUpdate(gvk schema.GroupVersionKind) bool { switch gvk.Group { case "apps": diff --git a/pkg/client/fake/client_test.go b/pkg/client/fake/client_test.go index beb8d38433..36722b4ddc 100644 --- a/pkg/client/fake/client_test.go +++ b/pkg/client/fake/client_test.go @@ -660,6 +660,19 @@ var _ = Describe("Fake client", func() { Expect(obj.ObjectMeta.ResourceVersion).To(Equal(trackerAddResourceVersion)) }) + It("should reject apply with non-matching ResourceVersion", func(ctx SpecContext) { + cl := NewClientBuilder().WithRuntimeObjects(cm).Build() + applyCM := corev1applyconfigurations.ConfigMap(cm.Name, cm.Namespace).WithResourceVersion("0") + err := cl.Apply(ctx, applyCM, client.FieldOwner("test")) + Expect(apierrors.IsConflict(err)).To(BeTrue()) + + obj := &corev1.ConfigMap{} + err = cl.Get(ctx, client.ObjectKeyFromObject(cm), obj) + Expect(err).ToNot(HaveOccurred()) + Expect(obj).To(Equal(cm)) + Expect(obj.ObjectMeta.ResourceVersion).To(Equal(trackerAddResourceVersion)) + }) + It("should reject Delete with a mismatched ResourceVersion", func(ctx SpecContext) { bogusRV := "bogus" By("Deleting with a mismatched ResourceVersion Precondition") @@ -714,6 +727,35 @@ var _ = Describe("Fake client", func() { Expect(list.Items).To(ConsistOf(*dep2)) }) + It("should handle finalizers in Apply ", func(ctx SpecContext) { + cl := client.WithFieldOwner(cl, "test") + + By("Creating the object with a finalizer") + cm := corev1applyconfigurations.ConfigMap("test-cm", "delete-with-finalizers"). + WithFinalizers("finalizers.sigs.k8s.io/test") + Expect(cl.Apply(ctx, cm)).To(Succeed()) + + By("Deleting the object") + obj := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{ + Name: *cm.Name, + Namespace: *cm.Namespace, + }} + Expect(cl.Delete(ctx, obj)).NotTo(HaveOccurred()) + + By("Getting the object") + Expect(cl.Get(ctx, client.ObjectKeyFromObject(obj), obj)).NotTo(HaveOccurred()) + Expect(obj.DeletionTimestamp).NotTo(BeNil()) + + By("Removing the finalizer through SSA") + cm.ResourceVersion = nil + cm.Finalizers = nil + Expect(cl.Apply(ctx, cm)).NotTo(HaveOccurred()) + + By("Getting the object") + err := cl.Get(ctx, client.ObjectKeyFromObject(obj), &corev1.ConfigMap{}) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + }) + It("should handle finalizers on Update", func(ctx SpecContext) { namespacedName := types.NamespacedName{ Name: "test-cm", @@ -1733,6 +1775,65 @@ var _ = Describe("Fake client", func() { Expect(cmp.Diff(objOriginal, actual)).To(BeEmpty()) }) + It("should not change the status of objects with status subresource when creating through apply ", func(ctx SpecContext) { + obj := corev1applyconfigurations. + Pod("node", ""). + WithStatus( + corev1applyconfigurations.PodStatus().WithPhase("Running"), + ) + + cl := NewClientBuilder().WithStatusSubresource(&corev1.Pod{}).Build() + Expect(cl.Apply(ctx, obj, client.FieldOwner("test"))).To(Succeed()) + + p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: *obj.Name}} + Expect(cl.Get(ctx, client.ObjectKeyFromObject(p), p)).To(Succeed()) + + Expect(p.Status).To(BeComparableTo(corev1.PodStatus{})) + }) + + It("should not change the status of objects with status subresource when updating through apply ", func(ctx SpecContext) { + + cl := NewClientBuilder().WithStatusSubresource(&corev1.Pod{}).Build() + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod"}} + Expect(cl.Create(ctx, pod)).NotTo(HaveOccurred()) + + obj := corev1applyconfigurations. + Pod(pod.Name, ""). + WithStatus( + corev1applyconfigurations.PodStatus().WithPhase("Running"), + ) + Expect(cl.Apply(ctx, obj, client.FieldOwner("test"))).To(Succeed()) + + Expect(cl.Get(ctx, client.ObjectKeyFromObject(pod), pod)).To(Succeed()) + + Expect(pod.Status).To(BeComparableTo(corev1.PodStatus{})) + }) + + It("should only change status on status apply", func(ctx SpecContext) { + initial := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + }, + Spec: corev1.NodeSpec{ + PodCIDR: "old-cidr", + }, + } + cl := NewClientBuilder().WithStatusSubresource(&corev1.Node{}).WithObjects(initial).Build() + + ac := corev1applyconfigurations.Node(initial.Name). + WithSpec(corev1applyconfigurations.NodeSpec().WithPodCIDR(initial.Spec.PodCIDR + "-updated")). + WithStatus(corev1applyconfigurations.NodeStatus().WithPhase(corev1.NodeRunning)) + + Expect(cl.Status().Apply(ctx, ac, client.FieldOwner("test-owner"))).To(Succeed()) + + actual := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: initial.Name}} + Expect(cl.Get(ctx, client.ObjectKeyFromObject(actual), actual)).To(Succeed()) + + initial.ResourceVersion = actual.ResourceVersion + initial.Status = actual.Status + Expect(initial).To(BeComparableTo(actual)) + }) + It("should Unmarshal the schemaless object with int64 to preserve ints", func(ctx SpecContext) { schemeBuilder := &scheme.Builder{GroupVersion: schema.GroupVersion{Group: "test", Version: "v1"}} schemeBuilder.Register(&WithSchemalessSpec{}) @@ -2781,6 +2882,17 @@ var _ = Describe("Fake client", func() { Expect(cm.Data).To(BeComparableTo(map[string]string{"other": "data"})) }) + It("returns a conflict when trying to Create an object with UID set through Apply", func(ctx SpecContext) { + cl := NewClientBuilder().Build() + obj := corev1applyconfigurations. + ConfigMap("foo", "default"). + WithUID("123") + + err := cl.Apply(ctx, obj, &client.ApplyOptions{FieldManager: "test-manager"}) + Expect(err).To(HaveOccurred()) + Expect(apierrors.IsConflict(err)).To(BeTrue()) + }) + It("errors when trying to server-side apply an object without configuring a FieldManager", func(ctx SpecContext) { cl := NewClientBuilder().Build() obj := corev1applyconfigurations. @@ -2827,7 +2939,7 @@ var _ = Describe("Fake client", func() { Expect(result.Object["spec"]).To(Equal(map[string]any{"other": "data"})) }) - It("sets managed fields through all methods", func(ctx SpecContext) { + It("sets the fieldManager in create, patch and update", func(ctx SpecContext) { owner := "test-owner" cl := client.WithFieldOwner( NewClientBuilder().WithReturnManagedFields().Build(), @@ -2861,6 +2973,20 @@ var _ = Describe("Fake client", func() { } }) + It("sets the fieldManager when creating through update", func(ctx SpecContext) { + owner := "test-owner" + cl := client.WithFieldOwner( + NewClientBuilder().WithReturnManagedFields().Build(), + owner, + ) + + obj := &corev1.Event{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + Expect(cl.Update(ctx, obj, client.FieldOwner(owner))).NotTo(HaveOccurred()) + for _, f := range obj.ManagedFields { + Expect(f.Manager).To(BeEquivalentTo(owner)) + } + }) + // GH-3267 It("Doesn't leave stale data when updating an object through SSA", func(ctx SpecContext) { obj := corev1applyconfigurations. @@ -2877,6 +3003,29 @@ var _ = Describe("Fake client", func() { Expect(len(cms.Items)).To(BeEquivalentTo(1)) }) + It("sets resourceVersion on SSA create", func(ctx SpecContext) { + obj := corev1applyconfigurations. + ConfigMap("foo", "default"). + WithData(map[string]string{"some": "data"}) + + cl := NewClientBuilder().Build() + Expect(cl.Apply(ctx, obj, client.FieldOwner("foo"))).NotTo(HaveOccurred()) + // Ideally we should only test for it to not be empty, realistically we will + // break ppl if we ever start setting a different value. + Expect(obj.ResourceVersion).To(BeEquivalentTo(ptr.To("1"))) + }) + + It("ignores a passed resourceVersion on SSA create", func(ctx SpecContext) { + obj := corev1applyconfigurations. + ConfigMap("foo", "default"). + WithData(map[string]string{"some": "data"}). + WithResourceVersion("1234") + + cl := NewClientBuilder().Build() + Expect(cl.Apply(ctx, obj, client.FieldOwner("foo"))).NotTo(HaveOccurred()) + Expect(obj.ResourceVersion).To(BeEquivalentTo(ptr.To("1"))) + }) + It("allows to set deletionTimestamp on an object during SSA create", func(ctx SpecContext) { now := metav1.Time{Time: time.Now().Round(time.Second)} obj := corev1applyconfigurations. @@ -3159,4 +3308,13 @@ var _ = Describe("Fake client builder", func() { Expect(err).NotTo(HaveOccurred()) Expect(called).To(BeTrue()) }) + + It("should panic when calling build more than once", func() { + cb := NewClientBuilder() + anotherCb := cb + cb.Build() + Expect(func() { + anotherCb.Build() + }).To(Panic()) + }) }) diff --git a/pkg/client/fake/versioned_tracker.go b/pkg/client/fake/versioned_tracker.go new file mode 100644 index 0000000000..bc1eaeb951 --- /dev/null +++ b/pkg/client/fake/versioned_tracker.go @@ -0,0 +1,361 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "bytes" + "errors" + "fmt" + "runtime/debug" + "strconv" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/managedfields" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/testing" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +var _ testing.ObjectTracker = (*versionedTracker)(nil) + +type versionedTracker struct { + upstream testing.ObjectTracker + scheme *runtime.Scheme + withStatusSubresource sets.Set[schema.GroupVersionKind] + usesFieldManagedObjectTracker bool +} + +func (t versionedTracker) Add(obj runtime.Object) error { + var objects []runtime.Object + if meta.IsListType(obj) { + var err error + objects, err = meta.ExtractList(obj) + if err != nil { + return err + } + } else { + objects = []runtime.Object{obj} + } + for _, obj := range objects { + accessor, err := meta.Accessor(obj) + if err != nil { + return fmt.Errorf("failed to get accessor for object: %w", err) + } + if accessor.GetDeletionTimestamp() != nil && len(accessor.GetFinalizers()) == 0 { + return fmt.Errorf("refusing to create obj %s with metadata.deletionTimestamp but no finalizers", accessor.GetName()) + } + if accessor.GetResourceVersion() == "" { + // We use a "magic" value of 999 here because this field + // is parsed as uint and and 0 is already used in Update. + // As we can't go lower, go very high instead so this can + // be recognized + accessor.SetResourceVersion(trackerAddResourceVersion) + } + + obj, err = convertFromUnstructuredIfNecessary(t.scheme, obj) + if err != nil { + return err + } + + // If the fieldManager can not decode fields, it will just silently clear them. This is pretty + // much guaranteed not to be what someone that initializes a fake client with objects that + // have them set wants, so validate them here. + // Ref https://github.com/kubernetes/kubernetes/blob/a956ef4862993b825bcd524a19260192ff1da72d/staging/src/k8s.io/apimachinery/pkg/util/managedfields/internal/fieldmanager.go#L105 + if t.usesFieldManagedObjectTracker { + if err := managedfields.ValidateManagedFields(accessor.GetManagedFields()); err != nil { + return fmt.Errorf("invalid managedFields on %T: %w", obj, err) + } + } + if err := t.upstream.Add(obj); err != nil { + return err + } + } + + return nil +} + +func (t versionedTracker) Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string, opts ...metav1.CreateOptions) error { + accessor, err := meta.Accessor(obj) + if err != nil { + return fmt.Errorf("failed to get accessor for object: %w", err) + } + if accessor.GetName() == "" { + gvk, _ := apiutil.GVKForObject(obj, t.scheme) + return apierrors.NewInvalid( + gvk.GroupKind(), + accessor.GetName(), + field.ErrorList{field.Required(field.NewPath("metadata.name"), "name is required")}) + } + if accessor.GetResourceVersion() != "" { + return apierrors.NewBadRequest("resourceVersion can not be set for Create requests") + } + accessor.SetResourceVersion("1") + obj, err = convertFromUnstructuredIfNecessary(t.scheme, obj) + if err != nil { + return err + } + if err := t.upstream.Create(gvr, obj, ns, opts...); err != nil { + accessor.SetResourceVersion("") + return err + } + + return nil +} + +func (t versionedTracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string, opts ...metav1.UpdateOptions) error { + updateOpts, err := getSingleOrZeroOptions(opts) + if err != nil { + return err + } + + return t.update(gvr, obj, ns, false, false, updateOpts) +} + +func (t versionedTracker) update(gvr schema.GroupVersionResource, obj runtime.Object, ns string, isStatus, deleting bool, opts metav1.UpdateOptions) error { + gvk, err := apiutil.GVKForObject(obj, t.scheme) + if err != nil { + return err + } + obj, needsCreate, err := t.updateObject(gvr, gvk, obj, ns, isStatus, deleting, allowsCreateOnUpdate(gvk), opts.DryRun) + if err != nil { + return err + } + + if needsCreate { + opts := metav1.CreateOptions{DryRun: opts.DryRun, FieldManager: opts.FieldManager} + return t.Create(gvr, obj, ns, opts) + } + + if obj == nil { // Object was deleted in updateObject + return nil + } + + if u, unstructured := obj.(*unstructured.Unstructured); unstructured { + u.SetGroupVersionKind(gvk) + } + + return t.upstream.Update(gvr, obj, ns, opts) +} + +func (t versionedTracker) Patch(gvr schema.GroupVersionResource, obj runtime.Object, ns string, opts ...metav1.PatchOptions) error { + patchOptions, err := getSingleOrZeroOptions(opts) + if err != nil { + return err + } + + gvk, err := apiutil.GVKForObject(obj, t.scheme) + if err != nil { + return err + } + + // We apply patches using a client-go reaction that ends up calling the trackers Patch. As we can't change + // that reaction, we use the callstack to figure out if this originated from the status client. + isStatus := bytes.Contains(debug.Stack(), []byte("sigs.k8s.io/controller-runtime/pkg/client/fake.(*fakeSubResourceClient).statusPatch")) + + obj, needsCreate, err := t.updateObject(gvr, gvk, obj, ns, isStatus, false, allowsCreateOnUpdate(gvk), patchOptions.DryRun) + if err != nil { + return err + } + if needsCreate { + opts := metav1.CreateOptions{DryRun: patchOptions.DryRun, FieldManager: patchOptions.FieldManager} + return t.Create(gvr, obj, ns, opts) + } + + if obj == nil { // Object was deleted in updateObject + return nil + } + + return t.upstream.Patch(gvr, obj, ns, patchOptions) +} + +// updateObject performs a number of validations and changes related to +// object updates, such as checking and updating the resourceVersion. +func (t versionedTracker) updateObject( + gvr schema.GroupVersionResource, + gvk schema.GroupVersionKind, + obj runtime.Object, + ns string, + isStatus bool, + deleting bool, + allowCreateOnUpdate bool, + dryRun []string, +) (result runtime.Object, needsCreate bool, _ error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, false, fmt.Errorf("failed to get accessor for object: %w", err) + } + + if accessor.GetName() == "" { + return nil, false, apierrors.NewInvalid( + gvk.GroupKind(), + accessor.GetName(), + field.ErrorList{field.Required(field.NewPath("metadata.name"), "name is required")}) + } + + oldObject, err := t.Get(gvr, ns, accessor.GetName()) + if err != nil { + // If the resource is not found and the resource allows create on update, issue a + // create instead. + if apierrors.IsNotFound(err) && allowCreateOnUpdate { + // Pass this info to the caller rather than create, because in the SSA case it + // must be created by calling Apply in the upstream tracker, not Create. + // This is because SSA considers Apply and Non-Apply operations to be different + // even when they use the same fieldManager. This behavior is also observable + // with a real Kubernetes apiserver. + // + // Ref https://kubernetes.slack.com/archives/C0EG7JC6T/p1757868204458989?thread_ts=1757808656.002569&cid=C0EG7JC6T + return obj, true, nil + } + return obj, false, err + } + + if t.withStatusSubresource.Has(gvk) { + if isStatus { // copy everything but status and metadata.ResourceVersion from original object + if err := copyStatusFrom(obj, oldObject); err != nil { + return nil, false, fmt.Errorf("failed to copy non-status field for object with status subresouce: %w", err) + } + passedRV := accessor.GetResourceVersion() + if err := copyFrom(oldObject, obj); err != nil { + return nil, false, fmt.Errorf("failed to restore non-status fields: %w", err) + } + accessor.SetResourceVersion(passedRV) + } else { // copy status from original object + if err := copyStatusFrom(oldObject, obj); err != nil { + return nil, false, fmt.Errorf("failed to copy the status for object with status subresource: %w", err) + } + } + } else if isStatus { + return nil, false, apierrors.NewNotFound(gvr.GroupResource(), accessor.GetName()) + } + + oldAccessor, err := meta.Accessor(oldObject) + if err != nil { + return nil, false, err + } + + // If the new object does not have the resource version set and it allows unconditional update, + // default it to the resource version of the existing resource + if accessor.GetResourceVersion() == "" { + switch { + case allowsUnconditionalUpdate(gvk): + accessor.SetResourceVersion(oldAccessor.GetResourceVersion()) + // This is needed because if the patch explicitly sets the RV to null, the client-go reaction we use + // to apply it and whose output we process here will have it unset. It is not clear why the Kubernetes + // apiserver accepts such a patch, but it does so we just copy that behavior. + // Kubernetes apiserver behavior can be checked like this: + // `kubectl patch configmap foo --patch '{"metadata":{"annotations":{"foo":"bar"},"resourceVersion":null}}' -v=9` + case bytes. + Contains(debug.Stack(), []byte("sigs.k8s.io/controller-runtime/pkg/client/fake.(*fakeClient).Patch")): + // We apply patches using a client-go reaction that ends up calling the trackers Update. As we can't change + // that reaction, we use the callstack to figure out if this originated from the "fakeClient.Patch" func. + accessor.SetResourceVersion(oldAccessor.GetResourceVersion()) + } + } + + if accessor.GetResourceVersion() != oldAccessor.GetResourceVersion() { + return nil, false, apierrors.NewConflict(gvr.GroupResource(), accessor.GetName(), errors.New("object was modified")) + } + if oldAccessor.GetResourceVersion() == "" { + oldAccessor.SetResourceVersion("0") + } + intResourceVersion, err := strconv.ParseUint(oldAccessor.GetResourceVersion(), 10, 64) + if err != nil { + return nil, false, fmt.Errorf("can not convert resourceVersion %q to int: %w", oldAccessor.GetResourceVersion(), err) + } + intResourceVersion++ + accessor.SetResourceVersion(strconv.FormatUint(intResourceVersion, 10)) + + if !deleting && !deletionTimestampEqual(accessor, oldAccessor) { + return nil, false, fmt.Errorf("error: Unable to edit %s: metadata.deletionTimestamp field is immutable", accessor.GetName()) + } + + if !accessor.GetDeletionTimestamp().IsZero() && len(accessor.GetFinalizers()) == 0 { + return nil, false, t.Delete(gvr, accessor.GetNamespace(), accessor.GetName(), metav1.DeleteOptions{DryRun: dryRun}) + } + + obj, err = convertFromUnstructuredIfNecessary(t.scheme, obj) + return obj, false, err +} + +func (t versionedTracker) Apply(gvr schema.GroupVersionResource, applyConfiguration runtime.Object, ns string, opts ...metav1.PatchOptions) error { + patchOptions, err := getSingleOrZeroOptions(opts) + if err != nil { + return err + } + gvk, err := apiutil.GVKForObject(applyConfiguration, t.scheme) + if err != nil { + return err + } + isStatus := bytes.Contains(debug.Stack(), []byte("sigs.k8s.io/controller-runtime/pkg/client/fake.(*fakeSubResourceClient).statusPatch")) + + applyConfiguration, needsCreate, err := t.updateObject(gvr, gvk, applyConfiguration, ns, isStatus, false, true, patchOptions.DryRun) + if err != nil { + return err + } + + if needsCreate { + // https://github.com/kubernetes/kubernetes/blob/81affffa1b8d8079836f4cac713ea8d1b2bbf10f/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go#L606 + accessor, err := meta.Accessor(applyConfiguration) + if err != nil { + return fmt.Errorf("failed to get accessor for object: %w", err) + } + if accessor.GetUID() != "" { + return apierrors.NewConflict(gvr.GroupResource(), accessor.GetName(), fmt.Errorf("uid mismatch: the provided object specified uid %s, and no existing object was found", accessor.GetUID())) + } + + if t.withStatusSubresource.Has(gvk) { + // Clear out status for create, for update this is handled in updateObject + if err := copyStatusFrom(&unstructured.Unstructured{}, applyConfiguration); err != nil { + return err + } + } + } + + if applyConfiguration == nil { // Object was deleted in updateObject + return nil + } + + if isStatus { + // We restore everything but status from the tracker where we don't put GVK + // into the object but it must be set for the ManagedFieldsObjectTracker + applyConfiguration.GetObjectKind().SetGroupVersionKind(gvk) + } + return t.upstream.Apply(gvr, applyConfiguration, ns, opts...) +} + +func (t versionedTracker) Delete(gvr schema.GroupVersionResource, ns, name string, opts ...metav1.DeleteOptions) error { + return t.upstream.Delete(gvr, ns, name, opts...) +} + +func (t versionedTracker) Get(gvr schema.GroupVersionResource, ns, name string, opts ...metav1.GetOptions) (runtime.Object, error) { + return t.upstream.Get(gvr, ns, name, opts...) +} + +func (t versionedTracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string, opts ...metav1.ListOptions) (runtime.Object, error) { + return t.upstream.List(gvr, gvk, ns, opts...) +} + +func (t versionedTracker) Watch(gvr schema.GroupVersionResource, ns string, opts ...metav1.ListOptions) (watch.Interface, error) { + return t.upstream.Watch(gvr, ns, opts...) +} diff --git a/pkg/client/fieldowner.go b/pkg/client/fieldowner.go index 93274f9500..5d9437ba91 100644 --- a/pkg/client/fieldowner.go +++ b/pkg/client/fieldowner.go @@ -108,3 +108,7 @@ func (f *subresourceClientWithFieldOwner) Update(ctx context.Context, obj Object func (f *subresourceClientWithFieldOwner) Patch(ctx context.Context, obj Object, patch Patch, opts ...SubResourcePatchOption) error { return f.subresourceWriter.Patch(ctx, obj, patch, append([]SubResourcePatchOption{FieldOwner(f.owner)}, opts...)...) } + +func (f *subresourceClientWithFieldOwner) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...SubResourceApplyOption) error { + return f.subresourceWriter.Apply(ctx, obj, append([]SubResourceApplyOption{FieldOwner(f.owner)}, opts...)...) +} diff --git a/pkg/client/fieldowner_test.go b/pkg/client/fieldowner_test.go index 95cb4e0f91..069abbc115 100644 --- a/pkg/client/fieldowner_test.go +++ b/pkg/client/fieldowner_test.go @@ -21,6 +21,8 @@ import ( "testing" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + corev1applyconfigurations "k8s.io/client-go/applyconfigurations/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" @@ -33,18 +35,22 @@ func TestWithFieldOwner(t *testing.T) { ctx := t.Context() dummyObj := &corev1.Namespace{} + dummyObjectAC := corev1applyconfigurations.Namespace(dummyObj.Name) _ = wrappedClient.Create(ctx, dummyObj) _ = wrappedClient.Update(ctx, dummyObj) _ = wrappedClient.Patch(ctx, dummyObj, nil) + _ = wrappedClient.Apply(ctx, dummyObjectAC) _ = wrappedClient.Status().Create(ctx, dummyObj, dummyObj) _ = wrappedClient.Status().Update(ctx, dummyObj) _ = wrappedClient.Status().Patch(ctx, dummyObj, nil) + _ = wrappedClient.Status().Apply(ctx, dummyObjectAC) _ = wrappedClient.SubResource("some-subresource").Create(ctx, dummyObj, dummyObj) _ = wrappedClient.SubResource("some-subresource").Update(ctx, dummyObj) _ = wrappedClient.SubResource("some-subresource").Patch(ctx, dummyObj, nil) + _ = wrappedClient.SubResource("some-subresource").Apply(ctx, dummyObjectAC) - if expectedCalls := 9; calls != expectedCalls { + if expectedCalls := 12; calls != expectedCalls { t.Fatalf("wrong number of calls to assertions: expected=%d; got=%d", expectedCalls, calls) } } @@ -57,18 +63,22 @@ func TestWithFieldOwnerOverridden(t *testing.T) { ctx := t.Context() dummyObj := &corev1.Namespace{} + dummyObjectAC := corev1applyconfigurations.Namespace(dummyObj.Name) _ = wrappedClient.Create(ctx, dummyObj, client.FieldOwner("new-field-manager")) _ = wrappedClient.Update(ctx, dummyObj, client.FieldOwner("new-field-manager")) _ = wrappedClient.Patch(ctx, dummyObj, nil, client.FieldOwner("new-field-manager")) + _ = wrappedClient.Apply(ctx, dummyObjectAC, client.FieldOwner("new-field-manager")) _ = wrappedClient.Status().Create(ctx, dummyObj, dummyObj, client.FieldOwner("new-field-manager")) _ = wrappedClient.Status().Update(ctx, dummyObj, client.FieldOwner("new-field-manager")) _ = wrappedClient.Status().Patch(ctx, dummyObj, nil, client.FieldOwner("new-field-manager")) + _ = wrappedClient.Status().Apply(ctx, dummyObjectAC, client.FieldOwner("new-field-manager")) _ = wrappedClient.SubResource("some-subresource").Create(ctx, dummyObj, dummyObj, client.FieldOwner("new-field-manager")) _ = wrappedClient.SubResource("some-subresource").Update(ctx, dummyObj, client.FieldOwner("new-field-manager")) _ = wrappedClient.SubResource("some-subresource").Patch(ctx, dummyObj, nil, client.FieldOwner("new-field-manager")) + _ = wrappedClient.SubResource("some-subresource").Apply(ctx, dummyObjectAC, client.FieldOwner("new-field-manager")) - if expectedCalls := 9; calls != expectedCalls { + if expectedCalls := 12; calls != expectedCalls { t.Fatalf("wrong number of calls to assertions: expected=%d; got=%d", expectedCalls, calls) } } @@ -144,5 +154,27 @@ func testClient(t *testing.T, expectedFieldManager string, callback func()) clie } return nil }, + Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error { + callback() + out := &client.ApplyOptions{} + for _, f := range opts { + f.ApplyToApply(out) + } + if got := out.FieldManager; expectedFieldManager != got { + t.Fatalf("wrong field manager: expected=%q; got=%q", expectedFieldManager, got) + } + return nil + }, + SubResourceApply: func(ctx context.Context, c client.Client, subResourceName string, obj runtime.ApplyConfiguration, opts ...client.SubResourceApplyOption) error { + callback() + out := &client.SubResourceApplyOptions{} + for _, f := range opts { + f.ApplyToSubResourceApply(out) + } + if got := out.FieldManager; expectedFieldManager != got { + t.Fatalf("wrong field manager: expected=%q; got=%q", expectedFieldManager, got) + } + return nil + }, }).Build() } diff --git a/pkg/client/fieldvalidation.go b/pkg/client/fieldvalidation.go index ce8d0576c7..b0f660854e 100644 --- a/pkg/client/fieldvalidation.go +++ b/pkg/client/fieldvalidation.go @@ -27,6 +27,9 @@ import ( // WithFieldValidation wraps a Client and configures field validation, by // default, for all write requests from this client. Users can override field // validation for individual write requests. +// +// This wrapper has no effect on apply requests, as they do not support a +// custom fieldValidation setting, it is always strict. func WithFieldValidation(c Client, validation FieldValidation) Client { return &clientWithFieldValidation{ validation: validation, @@ -108,3 +111,7 @@ func (c *subresourceClientWithFieldValidation) Update(ctx context.Context, obj O func (c *subresourceClientWithFieldValidation) Patch(ctx context.Context, obj Object, patch Patch, opts ...SubResourcePatchOption) error { return c.subresourceWriter.Patch(ctx, obj, patch, append([]SubResourcePatchOption{c.validation}, opts...)...) } + +func (c *subresourceClientWithFieldValidation) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...SubResourceApplyOption) error { + return c.subresourceWriter.Apply(ctx, obj, opts...) +} diff --git a/pkg/client/fieldvalidation_test.go b/pkg/client/fieldvalidation_test.go index d32ee5717d..6e6e9e5d17 100644 --- a/pkg/client/fieldvalidation_test.go +++ b/pkg/client/fieldvalidation_test.go @@ -92,6 +92,15 @@ var _ = Describe("ClientWithFieldValidation", func() { err = wrappedClient.SubResource("status").Patch(ctx, invalidStatusNode, patch) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("strict decoding error: unknown field \"status.invalidStatusField\"")) + + invalidApplyConfig := client.ApplyConfigurationFromUnstructured(invalidStatusNode) + err = wrappedClient.Status().Apply(ctx, invalidApplyConfig, client.FieldOwner("test-owner")) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("field not declared in schema")) + + err = wrappedClient.SubResource("status").Apply(ctx, invalidApplyConfig, client.FieldOwner("test-owner")) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("field not declared in schema")) }) }) @@ -110,11 +119,13 @@ func TestWithStrictFieldValidation(t *testing.T) { _ = wrappedClient.Status().Create(ctx, dummyObj, dummyObj) _ = wrappedClient.Status().Update(ctx, dummyObj) _ = wrappedClient.Status().Patch(ctx, dummyObj, nil) + _ = wrappedClient.Status().Apply(ctx, corev1applyconfigurations.Namespace(""), nil) _ = wrappedClient.SubResource("some-subresource").Create(ctx, dummyObj, dummyObj) _ = wrappedClient.SubResource("some-subresource").Update(ctx, dummyObj) _ = wrappedClient.SubResource("some-subresource").Patch(ctx, dummyObj, nil) + _ = wrappedClient.SubResource("some-subresource").Apply(ctx, corev1applyconfigurations.Namespace(""), nil) - if expectedCalls := 10; calls != expectedCalls { + if expectedCalls := 12; calls != expectedCalls { t.Fatalf("wrong number of calls to assertions: expected=%d; got=%d", expectedCalls, calls) } } @@ -278,5 +289,9 @@ func testFieldValidationClient(t *testing.T, expectedFieldValidation string, cal } return nil }, + SubResourceApply: func(ctx context.Context, c client.Client, subResourceName string, obj runtime.ApplyConfiguration, opts ...client.SubResourceApplyOption) error { + callback() + return nil + }, }).Build() } diff --git a/pkg/client/interceptor/intercept.go b/pkg/client/interceptor/intercept.go index 7ff73bd8da..b98af1a693 100644 --- a/pkg/client/interceptor/intercept.go +++ b/pkg/client/interceptor/intercept.go @@ -26,6 +26,7 @@ type Funcs struct { SubResourceCreate func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, subResource client.Object, opts ...client.SubResourceCreateOption) error SubResourceUpdate func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error SubResourcePatch func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error + SubResourceApply func(ctx context.Context, client client.Client, subResourceName string, obj runtime.ApplyConfiguration, opts ...client.SubResourceApplyOption) error } // NewClient returns a new interceptor client that calls the functions in funcs instead of the underlying client's methods, if they are not nil. @@ -173,3 +174,10 @@ func (s subResourceInterceptor) Patch(ctx context.Context, obj client.Object, pa } return s.client.SubResource(s.subResourceName).Patch(ctx, obj, patch, opts...) } + +func (s subResourceInterceptor) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...client.SubResourceApplyOption) error { + if s.funcs.SubResourceApply != nil { + return s.funcs.SubResourceApply(ctx, s.client, s.subResourceName, obj, opts...) + } + return s.client.SubResource(s.subResourceName).Apply(ctx, obj, opts...) +} diff --git a/pkg/client/interceptor/intercept_test.go b/pkg/client/interceptor/intercept_test.go index 26ea5b057e..fb58dfeac1 100644 --- a/pkg/client/interceptor/intercept_test.go +++ b/pkg/client/interceptor/intercept_test.go @@ -351,6 +351,31 @@ var _ = Describe("NewSubResourceClient", func() { _ = client2.SubResource("foo").Create(ctx, nil, nil) Expect(called).To(BeTrue()) }) + It("should call the provided Apply function", func(ctx SpecContext) { + var called bool + client := NewClient(c, Funcs{ + SubResourceApply: func(_ context.Context, client client.Client, subResourceName string, obj runtime.ApplyConfiguration, opts ...client.SubResourceApplyOption) error { + called = true + Expect(subResourceName).To(BeEquivalentTo("foo")) + return nil + }, + }) + _ = client.SubResource("foo").Apply(ctx, nil) + Expect(called).To(BeTrue()) + }) + It("should call the underlying client if the provided Apply function is nil", func(ctx SpecContext) { + var called bool + client1 := NewClient(c, Funcs{ + SubResourceApply: func(_ context.Context, client client.Client, subResourceName string, obj runtime.ApplyConfiguration, opts ...client.SubResourceApplyOption) error { + called = true + Expect(subResourceName).To(BeEquivalentTo("foo")) + return nil + }, + }) + client2 := NewClient(client1, Funcs{}) + _ = client2.SubResource("foo").Apply(ctx, nil) + Expect(called).To(BeTrue()) + }) }) type dummyClient struct{} diff --git a/pkg/client/interfaces.go b/pkg/client/interfaces.go index 61559ecbe1..1af1f3a368 100644 --- a/pkg/client/interfaces.go +++ b/pkg/client/interfaces.go @@ -155,6 +155,9 @@ type SubResourceWriter interface { // pointer so that obj can be updated with the content returned by the // Server. Patch(ctx context.Context, obj Object, patch Patch, opts ...SubResourcePatchOption) error + + // Apply applies the given apply configurations subresource. + Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...SubResourceApplyOption) error } // SubResourceClient knows how to perform CRU operations on Kubernetes objects. diff --git a/pkg/client/namespaced_client.go b/pkg/client/namespaced_client.go index cacba4a9c6..445e91b98b 100644 --- a/pkg/client/namespaced_client.go +++ b/pkg/client/namespaced_client.go @@ -150,7 +150,7 @@ func (n *namespacedClient) Patch(ctx context.Context, obj Object, patch Patch, o return n.client.Patch(ctx, obj, patch, opts...) } -func (n *namespacedClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error { +func (n *namespacedClient) setNamespaceForApplyConfigIfNamespaceScoped(obj runtime.ApplyConfiguration) error { var gvk schema.GroupVersionKind switch o := obj.(type) { case applyConfiguration: @@ -193,6 +193,14 @@ func (n *namespacedClient) Apply(ctx context.Context, obj runtime.ApplyConfigura } } + return nil +} + +func (n *namespacedClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...ApplyOption) error { + if err := n.setNamespaceForApplyConfigIfNamespaceScoped(obj); err != nil { + return err + } + return n.client.Apply(ctx, obj, opts...) } @@ -226,7 +234,10 @@ func (n *namespacedClient) Status() SubResourceWriter { // SubResource implements client.SubResourceClient. func (n *namespacedClient) SubResource(subResource string) SubResourceClient { - return &namespacedClientSubResourceClient{client: n.client.SubResource(subResource), namespace: n.namespace, namespacedclient: n} + return &namespacedClientSubResourceClient{ + client: n.client.SubResource(subResource), + namespacedclient: n, + } } // ensure namespacedClientSubResourceClient implements client.SubResourceClient. @@ -234,8 +245,7 @@ var _ SubResourceClient = &namespacedClientSubResourceClient{} type namespacedClientSubResourceClient struct { client SubResourceClient - namespace string - namespacedclient Client + namespacedclient *namespacedClient } func (nsw *namespacedClientSubResourceClient) Get(ctx context.Context, obj, subResource Object, opts ...SubResourceGetOption) error { @@ -245,12 +255,12 @@ func (nsw *namespacedClientSubResourceClient) Get(ctx context.Context, obj, subR } objectNamespace := obj.GetNamespace() - if objectNamespace != nsw.namespace && objectNamespace != "" { - return fmt.Errorf("namespace %s of the object %s does not match the namespace %s on the client", objectNamespace, obj.GetName(), nsw.namespace) + if objectNamespace != nsw.namespacedclient.namespace && objectNamespace != "" { + return fmt.Errorf("namespace %s of the object %s does not match the namespace %s on the client", objectNamespace, obj.GetName(), nsw.namespacedclient.namespace) } if isNamespaceScoped && objectNamespace == "" { - obj.SetNamespace(nsw.namespace) + obj.SetNamespace(nsw.namespacedclient.namespace) } return nsw.client.Get(ctx, obj, subResource, opts...) @@ -263,12 +273,12 @@ func (nsw *namespacedClientSubResourceClient) Create(ctx context.Context, obj, s } objectNamespace := obj.GetNamespace() - if objectNamespace != nsw.namespace && objectNamespace != "" { - return fmt.Errorf("namespace %s of the object %s does not match the namespace %s on the client", objectNamespace, obj.GetName(), nsw.namespace) + if objectNamespace != nsw.namespacedclient.namespace && objectNamespace != "" { + return fmt.Errorf("namespace %s of the object %s does not match the namespace %s on the client", objectNamespace, obj.GetName(), nsw.namespacedclient.namespace) } if isNamespaceScoped && objectNamespace == "" { - obj.SetNamespace(nsw.namespace) + obj.SetNamespace(nsw.namespacedclient.namespace) } return nsw.client.Create(ctx, obj, subResource, opts...) @@ -282,12 +292,12 @@ func (nsw *namespacedClientSubResourceClient) Update(ctx context.Context, obj Ob } objectNamespace := obj.GetNamespace() - if objectNamespace != nsw.namespace && objectNamespace != "" { - return fmt.Errorf("namespace %s of the object %s does not match the namespace %s on the client", objectNamespace, obj.GetName(), nsw.namespace) + if objectNamespace != nsw.namespacedclient.namespace && objectNamespace != "" { + return fmt.Errorf("namespace %s of the object %s does not match the namespace %s on the client", objectNamespace, obj.GetName(), nsw.namespacedclient.namespace) } if isNamespaceScoped && objectNamespace == "" { - obj.SetNamespace(nsw.namespace) + obj.SetNamespace(nsw.namespacedclient.namespace) } return nsw.client.Update(ctx, obj, opts...) } @@ -300,12 +310,19 @@ func (nsw *namespacedClientSubResourceClient) Patch(ctx context.Context, obj Obj } objectNamespace := obj.GetNamespace() - if objectNamespace != nsw.namespace && objectNamespace != "" { - return fmt.Errorf("namespace %s of the object %s does not match the namespace %s on the client", objectNamespace, obj.GetName(), nsw.namespace) + if objectNamespace != nsw.namespacedclient.namespace && objectNamespace != "" { + return fmt.Errorf("namespace %s of the object %s does not match the namespace %s on the client", objectNamespace, obj.GetName(), nsw.namespacedclient.namespace) } if isNamespaceScoped && objectNamespace == "" { - obj.SetNamespace(nsw.namespace) + obj.SetNamespace(nsw.namespacedclient.namespace) } return nsw.client.Patch(ctx, obj, patch, opts...) } + +func (nsw *namespacedClientSubResourceClient) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...SubResourceApplyOption) error { + if err := nsw.namespacedclient.setNamespaceForApplyConfigIfNamespaceScoped(obj); err != nil { + return err + } + return nsw.client.Apply(ctx, obj, opts...) +} diff --git a/pkg/client/namespaced_client_test.go b/pkg/client/namespaced_client_test.go index cf28289e72..deae881d4a 100644 --- a/pkg/client/namespaced_client_test.go +++ b/pkg/client/namespaced_client_test.go @@ -37,6 +37,7 @@ import ( corev1applyconfigurations "k8s.io/client-go/applyconfigurations/core/v1" metav1applyconfigurations "k8s.io/client-go/applyconfigurations/meta/v1" rbacv1applyconfigurations "k8s.io/client-go/applyconfigurations/rbac/v1" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -613,6 +614,48 @@ var _ = Describe("NamespacedClient", func() { Expect(getClient().SubResource("status").Patch(ctx, changedDep, client.MergeFrom(dep))).To(HaveOccurred()) }) + + It("should change objects via status apply", func(ctx SpecContext) { + deploymentAC, err := appsv1applyconfigurations.ExtractDeployment(dep, "test-owner") + Expect(err).NotTo(HaveOccurred()) + deploymentAC.WithStatus(&appsv1applyconfigurations.DeploymentStatusApplyConfiguration{ + Replicas: ptr.To(int32(99)), + }) + + Expect(getClient().SubResource("status").Apply(ctx, deploymentAC, client.FieldOwner("test-owner"))).To(Succeed()) + + actual, err := clientset.AppsV1().Deployments(ns).Get(ctx, dep.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(actual).NotTo(BeNil()) + Expect(actual.GetNamespace()).To(BeEquivalentTo(ns)) + Expect(actual.Status.Replicas).To(BeEquivalentTo(99)) + }) + + It("should set namespace on ApplyConfiguration when applying via SubResource", func(ctx SpecContext) { + deploymentAC := appsv1applyconfigurations.Deployment(dep.Name, "") + deploymentAC.WithStatus(&appsv1applyconfigurations.DeploymentStatusApplyConfiguration{ + Replicas: ptr.To(int32(50)), + }) + + Expect(getClient().SubResource("status").Apply(ctx, deploymentAC, client.FieldOwner("test-owner"))).To(Succeed()) + + actual, err := clientset.AppsV1().Deployments(ns).Get(ctx, dep.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(actual).NotTo(BeNil()) + Expect(actual.GetNamespace()).To(BeEquivalentTo(ns)) + Expect(actual.Status.Replicas).To(BeEquivalentTo(50)) + }) + + It("should fail when applying via SubResource with conflicting namespace", func(ctx SpecContext) { + deploymentAC := appsv1applyconfigurations.Deployment(dep.Name, "different-namespace") + deploymentAC.WithStatus(&appsv1applyconfigurations.DeploymentStatusApplyConfiguration{ + Replicas: ptr.To(int32(25)), + }) + + err := getClient().SubResource("status").Apply(ctx, deploymentAC, client.FieldOwner("test-owner")) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("namespace")) + }) }) Describe("Test on invalid objects", func() { diff --git a/pkg/client/options.go b/pkg/client/options.go index 33c460738c..a6b921171a 100644 --- a/pkg/client/options.go +++ b/pkg/client/options.go @@ -97,6 +97,12 @@ type SubResourcePatchOption interface { ApplyToSubResourcePatch(*SubResourcePatchOptions) } +// SubResourceApplyOption configures a subresource apply request. +type SubResourceApplyOption interface { + // ApplyToSubResourceApply applies the configuration on the given patch options. + ApplyToSubResourceApply(*SubResourceApplyOptions) +} + // }}} // {{{ Multi-Type Options @@ -148,6 +154,10 @@ func (dryRunAll) ApplyToSubResourcePatch(opts *SubResourcePatchOptions) { opts.DryRun = []string{metav1.DryRunAll} } +func (dryRunAll) ApplyToSubResourceApply(opts *SubResourceApplyOptions) { + opts.DryRun = []string{metav1.DryRunAll} +} + // FieldOwner set the field manager name for the given server-side apply patch. type FieldOwner string @@ -186,6 +196,11 @@ func (f FieldOwner) ApplyToSubResourceUpdate(opts *SubResourceUpdateOptions) { opts.FieldManager = string(f) } +// ApplyToSubResourceApply applies this configuration to the given apply options. +func (f FieldOwner) ApplyToSubResourceApply(opts *SubResourceApplyOptions) { + opts.FieldManager = string(f) +} + // FieldValidation configures field validation for the given requests. type FieldValidation string @@ -949,6 +964,10 @@ func (forceOwnership) ApplyToApply(opts *ApplyOptions) { opts.Force = ptr.To(true) } +func (forceOwnership) ApplyToSubResourceApply(opts *SubResourceApplyOptions) { + opts.Force = ptr.To(true) +} + // }}} // {{{ DeleteAllOf Options diff --git a/pkg/client/options_test.go b/pkg/client/options_test.go index 0aa6a74007..082586bca3 100644 --- a/pkg/client/options_test.go +++ b/pkg/client/options_test.go @@ -374,6 +374,12 @@ var _ = Describe("DryRunAll", func() { t.ApplyToSubResourceUpdate(o) Expect(o.DryRun).To(Equal([]string{metav1.DryRunAll})) }) + It("Should apply to SubResourceApplyOptions", func() { + o := &client.SubResourceApplyOptions{ApplyOptions: client.ApplyOptions{DryRun: []string{"server"}}} + t := client.DryRunAll + t.ApplyToSubResourceApply(o) + Expect(o.DryRun).To(Equal([]string{metav1.DryRunAll})) + }) }) var _ = Describe("FieldOwner", func() { @@ -419,6 +425,12 @@ var _ = Describe("FieldOwner", func() { t.ApplyToSubResourceUpdate(o) Expect(o.FieldManager).To(Equal("foo")) }) + It("Should apply to SubResourceApplyOptions", func() { + o := &client.SubResourceApplyOptions{ApplyOptions: client.ApplyOptions{FieldManager: "bar"}} + t := client.FieldOwner("foo") + t.ApplyToSubResourceApply(o) + Expect(o.FieldManager).To(Equal("foo")) + }) }) var _ = Describe("ForceOwnership", func() { @@ -440,6 +452,12 @@ var _ = Describe("ForceOwnership", func() { t.ApplyToApply(o) Expect(*o.Force).To(BeTrue()) }) + It("Should apply to SubResourceApplyOptions", func() { + o := &client.SubResourceApplyOptions{} + t := client.ForceOwnership + t.ApplyToSubResourceApply(o) + Expect(*o.Force).To(BeTrue()) + }) }) var _ = Describe("HasLabels", func() { diff --git a/pkg/client/patch.go b/pkg/client/patch.go index ec55861080..9bd0953fdc 100644 --- a/pkg/client/patch.go +++ b/pkg/client/patch.go @@ -28,7 +28,7 @@ import ( var ( // Apply uses server-side apply to patch the given object. // - // Deprecated: Use client.Client.Apply() instead. + // Deprecated: Use client.Client.Apply() and client.Client.SubResource("subrsource").Apply() instead. Apply Patch = applyPatch{} // Merge uses the raw object as a merge patch, without modifications. diff --git a/pkg/client/typed_client.go b/pkg/client/typed_client.go index 3bd762a638..66ae2e4a5c 100644 --- a/pkg/client/typed_client.go +++ b/pkg/client/typed_client.go @@ -304,3 +304,36 @@ func (c *typedClient) PatchSubResource(ctx context.Context, obj Object, subResou Do(ctx). Into(body) } + +func (c *typedClient) ApplySubResource(ctx context.Context, obj runtime.ApplyConfiguration, subResource string, opts ...SubResourceApplyOption) error { + o, err := c.resources.getObjMeta(obj) + if err != nil { + return err + } + + applyOpts := &SubResourceApplyOptions{} + applyOpts.ApplyOpts(opts) + + body := obj + if applyOpts.SubResourceBody != nil { + body = applyOpts.SubResourceBody + } + + req, err := apply.NewRequest(o, body) + if err != nil { + return fmt.Errorf("failed to create apply request: %w", err) + } + + return req. + NamespaceIfScoped(o.namespace, o.isNamespaced()). + Resource(o.resource()). + Name(o.name). + SubResource(subResource). + VersionedParams(applyOpts.AsPatchOptions(), c.paramCodec). + Do(ctx). + // This is hacky, it is required because `Into` takes a `runtime.Object` and + // that is not implemented by the ApplyConfigurations. The generated clients + // don't have this problem because they deserialize into the api type, not the + // apply configuration: https://github.com/kubernetes/kubernetes/blob/22f5e01a37c0bc6a5f494dec14dd4e3688ee1d55/staging/src/k8s.io/client-go/gentype/type.go#L296-L317 + Into(runtimeObjectFromApplyConfiguration(obj)) +} diff --git a/pkg/client/unstructured_client.go b/pkg/client/unstructured_client.go index e636c3beef..d2ea6d7a32 100644 --- a/pkg/client/unstructured_client.go +++ b/pkg/client/unstructured_client.go @@ -386,3 +386,35 @@ func (uc *unstructuredClient) PatchSubResource(ctx context.Context, obj Object, u.GetObjectKind().SetGroupVersionKind(gvk) return result } + +func (uc *unstructuredClient) ApplySubResource(ctx context.Context, obj runtime.ApplyConfiguration, subResource string, opts ...SubResourceApplyOption) error { + unstructuredApplyConfig, ok := obj.(*unstructuredApplyConfiguration) + if !ok { + return fmt.Errorf("bug: unstructured client got an applyconfiguration that was not %T but %T", &unstructuredApplyConfiguration{}, obj) + } + o, err := uc.resources.getObjMeta(unstructuredApplyConfig.Unstructured) + if err != nil { + return err + } + + applyOpts := &SubResourceApplyOptions{} + applyOpts.ApplyOpts(opts) + + body := obj + if applyOpts.SubResourceBody != nil { + body = applyOpts.SubResourceBody + } + req, err := apply.NewRequest(o, body) + if err != nil { + return fmt.Errorf("failed to create apply request: %w", err) + } + + return req. + NamespaceIfScoped(o.namespace, o.isNamespaced()). + Resource(o.resource()). + Name(o.name). + SubResource(subResource). + VersionedParams(applyOpts.AsPatchOptions(), uc.paramCodec). + Do(ctx). + Into(unstructuredApplyConfig.Unstructured) +} diff --git a/pkg/config/controller.go b/pkg/config/controller.go index 3dafaef93b..5eea2965f6 100644 --- a/pkg/config/controller.go +++ b/pkg/config/controller.go @@ -79,7 +79,7 @@ type Controller struct { // UsePriorityQueue configures the controllers queue to use the controller-runtime provided // priority queue. // - // Note: This flag is disabled by default until a future version. This feature is currently in beta. + // Note: This flag is enabled by default. // For more details, see: https://github.com/kubernetes-sigs/controller-runtime/issues/2374. UsePriorityQueue *bool diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index afa15aebec..853788d52f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -91,7 +91,7 @@ type TypedOptions[request comparable] struct { // UsePriorityQueue configures the controllers queue to use the controller-runtime provided // priority queue. // - // Note: This flag is disabled by default until a future version. This feature is currently in beta. + // Note: This flag is enabled by default. // For more details, see: https://github.com/kubernetes-sigs/controller-runtime/issues/2374. UsePriorityQueue *bool @@ -250,7 +250,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req } if options.RateLimiter == nil { - if ptr.Deref(options.UsePriorityQueue, false) { + if ptr.Deref(options.UsePriorityQueue, true) { options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second) } else { options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]() @@ -259,7 +259,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req if options.NewQueue == nil { options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { - if ptr.Deref(options.UsePriorityQueue, false) { + if ptr.Deref(options.UsePriorityQueue, true) { return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { o.Log = options.Logger.WithValues("controller", controllerName) o.RateLimiter = rateLimiter diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 335e6d830e..06138a476b 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -439,9 +439,9 @@ var _ = Describe("controller.Controller", func() { Expect(ok).To(BeTrue()) }) - It("should configure a priority queue if UsePriorityQueue is set", func() { + It("should configure a priority queue per default", func() { m, err := manager.New(cfg, manager.Options{ - Controller: config.Controller{UsePriorityQueue: ptr.To(true)}, + Controller: config.Controller{}, }) Expect(err).NotTo(HaveOccurred()) @@ -458,12 +458,13 @@ var _ = Describe("controller.Controller", func() { Expect(ok).To(BeTrue()) }) - It("should not configure a priority queue if UsePriorityQueue is not set", func() { + It("should not configure a priority queue if UsePriorityQueue is set to false", func() { m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) c, err := controller.New("new-controller-17", m, controller.Options{ - Reconciler: rec, + Reconciler: rec, + UsePriorityQueue: ptr.To(false), }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/controller/controllerutil/controllerutil.go b/pkg/controller/controllerutil/controllerutil.go index 0088f88e5d..0f12b934ee 100644 --- a/pkg/controller/controllerutil/controllerutil.go +++ b/pkg/controller/controllerutil/controllerutil.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "reflect" + "slices" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -501,10 +502,8 @@ type MutateFn func() error // It returns an indication of whether it updated the object's list of finalizers. func AddFinalizer(o client.Object, finalizer string) (finalizersUpdated bool) { f := o.GetFinalizers() - for _, e := range f { - if e == finalizer { - return false - } + if slices.Contains(f, finalizer) { + return false } o.SetFinalizers(append(f, finalizer)) return true @@ -517,7 +516,7 @@ func RemoveFinalizer(o client.Object, finalizer string) (finalizersUpdated bool) length := len(f) index := 0 - for i := 0; i < length; i++ { + for i := range length { if f[i] == finalizer { continue } @@ -531,10 +530,5 @@ func RemoveFinalizer(o client.Object, finalizer string) (finalizersUpdated bool) // ContainsFinalizer checks an Object that the provided finalizer is present. func ContainsFinalizer(o client.Object, finalizer string) bool { f := o.GetFinalizers() - for _, e := range f { - if e == finalizer { - return true - } - } - return false + return slices.Contains(f, finalizer) } diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 49942186c0..98df84c56b 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -1,6 +1,7 @@ package priorityqueue import ( + "math" "sync" "sync/atomic" "time" @@ -206,6 +207,7 @@ func (w *priorityqueue[T]) spin() { blockForever := make(chan time.Time) var nextReady <-chan time.Time nextReady = blockForever + var nextItemReadyAt time.Time for { select { @@ -213,10 +215,10 @@ func (w *priorityqueue[T]) spin() { return case <-w.itemOrWaiterAdded: case <-nextReady: + nextReady = blockForever + nextItemReadyAt = time.Time{} } - nextReady = blockForever - func() { w.lock.Lock() defer w.lock.Unlock() @@ -227,39 +229,67 @@ func (w *priorityqueue[T]) spin() { // manipulating the tree from within Ascend might lead to panics, so // track what we want to delete and do it after we are done ascending. var toDelete []*item[T] - w.queue.Ascend(func(item *item[T]) bool { - if item.ReadyAt != nil { - if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { - nextReady = w.tick(readyAt) - return false + + var key T + + // Items in the queue tree are sorted first by priority and second by readiness, so + // items with a lower priority might be ready further down in the queue. + // We iterate through the priorities high to low until we find a ready item + pivot := item[T]{ + Key: key, + AddedCounter: 0, + Priority: math.MaxInt, + ReadyAt: nil, + } + + for { + pivotChange := false + + w.queue.AscendGreaterOrEqual(&pivot, func(item *item[T]) bool { + // Item is locked, we can not hand it out + if w.locked.Has(item.Key) { + return true } - if !w.becameReady.Has(item.Key) { - w.metrics.add(item.Key, item.Priority) - w.becameReady.Insert(item.Key) + + if item.ReadyAt != nil { + if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { + if nextItemReadyAt.After(*item.ReadyAt) || nextItemReadyAt.IsZero() { + nextReady = w.tick(readyAt) + nextItemReadyAt = *item.ReadyAt + } + + // Adjusting the pivot item moves the ascend to the next lower priority + pivot.Priority = item.Priority - 1 + pivotChange = true + return false + } + if !w.becameReady.Has(item.Key) { + w.metrics.add(item.Key, item.Priority) + w.becameReady.Insert(item.Key) + } } - } - if w.waiters.Load() == 0 { - // Have to keep iterating here to ensure we update metrics - // for further items that became ready and set nextReady. - return true - } + if w.waiters.Load() == 0 { + // Have to keep iterating here to ensure we update metrics + // for further items that became ready and set nextReady. + return true + } - // Item is locked, we can not hand it out - if w.locked.Has(item.Key) { - return true - } + w.metrics.get(item.Key, item.Priority) + w.locked.Insert(item.Key) + w.waiters.Add(-1) + delete(w.items, item.Key) + toDelete = append(toDelete, item) + w.becameReady.Delete(item.Key) + w.get <- *item - w.metrics.get(item.Key, item.Priority) - w.locked.Insert(item.Key) - w.waiters.Add(-1) - delete(w.items, item.Key) - toDelete = append(toDelete, item) - w.becameReady.Delete(item.Key) - w.get <- *item + return true + }) - return true - }) + if !pivotChange { + break + } + } for _, item := range toDelete { w.queue.Delete(item) @@ -290,9 +320,18 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) w.notifyItemOrWaiterAdded() - item := <-w.get - - return item.Key, item.Priority, w.shutdown.Load() + select { + case <-w.done: + // Return if the queue was shutdown while we were already waiting for an item here. + // For example controller workers are continuously calling GetWithPriority and + // GetWithPriority is blocking the workers if there are no items in the queue. + // If the controller and accordingly the queue is then shut down, without this code + // branch the controller workers remain blocked here and are unable to shut down. + var zero T + return zero, 0, true + case item := <-w.get: + return item.Key, item.Priority, w.shutdown.Load() + } } func (w *priorityqueue[T]) Get() (item T, shutdown bool) { @@ -378,6 +417,9 @@ func (w *priorityqueue[T]) logState() { } func less[T comparable](a, b *item[T]) bool { + if a.Priority != b.Priority { + return a.Priority > b.Priority + } if a.ReadyAt == nil && b.ReadyAt != nil { return true } @@ -387,9 +429,6 @@ func less[T comparable](a, b *item[T]) bool { if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) { return a.ReadyAt.Before(*b.ReadyAt) } - if a.Priority != b.Priority { - return a.Priority > b.Priority - } return a.AddedCounter < b.AddedCounter } @@ -417,4 +456,5 @@ type bTree[T any] interface { ReplaceOrInsert(item T) (_ T, _ bool) Delete(item T) (T, bool) Ascend(iterator btree.ItemIteratorG[T]) + AscendGreaterOrEqual(pivot T, iterator btree.ItemIteratorG[T]) } diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 13cf59b7e8..e18a6393eb 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -93,11 +93,10 @@ var _ = Describe("Controllerworkqueue", func() { q.AddWithOpts(AddOpts{}, "foo") q.AddWithOpts(AddOpts{}, "foo") - Consistently(q.Len).Should(Equal(1)) + Expect(q.Len()).To(Equal(1)) - cwq := q.(*priorityqueue[string]) - cwq.lockedLock.Lock() - Expect(cwq.locked.Len()).To(Equal(0)) + q.lockedLock.Lock() + Expect(q.locked.Len()).To(Equal(0)) Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) Expect(metrics.adds["test"]).To(Equal(1)) @@ -156,22 +155,13 @@ var _ = Describe("Controllerworkqueue", func() { }) It("returns an item only after after has passed", func() { - q, metrics := newQueue() + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() defer q.ShutDown() - now := time.Now().Round(time.Second) - nowLock := sync.Mutex{} - tick := make(chan time.Time) - - cwq := q.(*priorityqueue[string]) - cwq.now = func() time.Time { - nowLock.Lock() - defer nowLock.Unlock() - return now - } - cwq.tick = func(d time.Duration) <-chan time.Time { + originalTick := q.tick + q.tick = func(d time.Duration) <-chan time.Time { Expect(d).To(Equal(time.Second)) - return tick + return originalTick(d) } retrievedItem := make(chan struct{}) @@ -186,10 +176,7 @@ var _ = Describe("Controllerworkqueue", func() { Consistently(retrievedItem).ShouldNot(BeClosed()) - nowLock.Lock() - now = now.Add(time.Second) - nowLock.Unlock() - tick <- now + forwardQueueTimeBy(time.Second) Eventually(retrievedItem).Should(BeClosed()) Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) @@ -197,6 +184,35 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.retries["test"]).To(Equal(1)) }) + It("returns high priority item that became ready before low priority item", func() { + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() + defer q.ShutDown() + + tickSetup := make(chan any) + originalTick := q.tick + q.tick = func(d time.Duration) <-chan time.Time { + Expect(d).To(Equal(time.Second)) + close(tickSetup) + return originalTick(d) + } + + lowPriority := -100 + highPriority := 0 + q.AddWithOpts(AddOpts{After: 0, Priority: &lowPriority}, "foo") + q.AddWithOpts(AddOpts{After: time.Second, Priority: &highPriority}, "prio") + + Eventually(tickSetup).Should(BeClosed()) + + forwardQueueTimeBy(1 * time.Second) + key, prio, _ := q.GetWithPriority() + + Expect(key).To(Equal("prio")) + Expect(prio).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{-100: 1, 0: 0})) + Expect(metrics.adds["test"]).To(Equal(2)) + Expect(metrics.retries["test"]).To(Equal(1)) + }) + It("returns an item to a waiter as soon as it has one", func() { q, metrics := newQueue() defer q.ShutDown() @@ -223,20 +239,11 @@ var _ = Describe("Controllerworkqueue", func() { }) It("returns multiple items with after in correct order", func() { - q, metrics := newQueue() + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() defer q.ShutDown() - now := time.Now().Round(time.Second) - nowLock := sync.Mutex{} - tick := make(chan time.Time) - - cwq := q.(*priorityqueue[string]) - cwq.now = func() time.Time { - nowLock.Lock() - defer nowLock.Unlock() - return now - } - cwq.tick = func(d time.Duration) <-chan time.Time { + originalTick := q.tick + q.tick = func(d time.Duration) <-chan time.Time { // What a bunch of bs. Deferring in here causes // ginkgo to deadlock, presumably because it // never returns after the defer. Not deferring @@ -254,7 +261,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(d).To(Or(Equal(200*time.Millisecond), Equal(time.Second))) }() <-done - return tick + return originalTick(d) } retrievedItem := make(chan struct{}) @@ -276,10 +283,7 @@ var _ = Describe("Controllerworkqueue", func() { Consistently(retrievedItem).ShouldNot(BeClosed()) - nowLock.Lock() - now = now.Add(time.Second) - nowLock.Unlock() - tick <- now + forwardQueueTimeBy(time.Second) Eventually(retrievedItem).Should(BeClosed()) Eventually(retrievedSecondItem).Should(BeClosed()) @@ -321,6 +325,32 @@ var _ = Describe("Controllerworkqueue", func() { Expect(isShutDown).To(BeTrue()) }) + It("Get from priority queue should get unblocked when the priority queue is shut down", func() { + q, _ := newQueue() + + getUnblocked := make(chan struct{}) + + go func() { + defer GinkgoRecover() + defer close(getUnblocked) + + item, priority, isShutDown := q.GetWithPriority() + Expect(item).To(Equal("")) + Expect(priority).To(Equal(0)) + Expect(isShutDown).To(BeTrue()) + }() + + // Verify the go routine above is now waiting for an item. + Eventually(q.waiters.Load).Should(Equal(int64(1))) + Consistently(getUnblocked).ShouldNot(BeClosed()) + + // shut down + q.ShutDown() + + // Verify the shutdown unblocked the go routine. + Eventually(getUnblocked).Should(BeClosed()) + }) + It("items are included in Len() and the queueDepth metric once they are ready", func() { q, metrics := newQueue() defer q.ShutDown() @@ -462,21 +492,12 @@ var _ = Describe("Controllerworkqueue", func() { }) It("When adding items with rateLimit, previous items' rateLimit should not affect subsequent items", func() { - q, metrics := newQueue() + q, metrics, forwardQueueTimeBy := newQueueWithTimeForwarder() defer q.ShutDown() - now := time.Now().Round(time.Second) - nowLock := sync.Mutex{} - tick := make(chan time.Time) - - cwq := q.(*priorityqueue[string]) - cwq.rateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second) - cwq.now = func() time.Time { - nowLock.Lock() - defer nowLock.Unlock() - return now - } - cwq.tick = func(d time.Duration) <-chan time.Time { + q.rateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second) + originalTick := q.tick + q.tick = func(d time.Duration) <-chan time.Time { done := make(chan struct{}) go func() { defer GinkgoRecover() @@ -485,7 +506,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(d).To(Or(Equal(5*time.Millisecond), Equal(635*time.Millisecond))) }() <-done - return tick + return originalTick(d) } retrievedItem := make(chan struct{}) @@ -504,22 +525,16 @@ var _ = Describe("Controllerworkqueue", func() { // after 7 calls, the next When("bar") call will return 640ms. for range 7 { - cwq.rateLimiter.When("bar") + q.rateLimiter.When("bar") } q.AddWithOpts(AddOpts{RateLimited: true}, "foo", "bar") Consistently(retrievedItem).ShouldNot(BeClosed()) - nowLock.Lock() - now = now.Add(5 * time.Millisecond) - nowLock.Unlock() - tick <- now + forwardQueueTimeBy(5 * time.Millisecond) Eventually(retrievedItem).Should(BeClosed()) Consistently(retrievedSecondItem).ShouldNot(BeClosed()) - nowLock.Lock() - now = now.Add(635 * time.Millisecond) - nowLock.Unlock() - tick <- now + forwardQueueTimeBy(635 * time.Millisecond) Eventually(retrievedSecondItem).Should(BeClosed()) Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) @@ -692,7 +707,31 @@ func TestFuzzPriorityQueue(t *testing.T) { wg.Wait() } -func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { +func newQueueWithTimeForwarder() (_ *priorityqueue[string], _ *fakeMetricsProvider, forwardQueueTime func(time.Duration)) { + q, m := newQueue() + + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + q.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + q.tick = func(d time.Duration) <-chan time.Time { + return tick + } + + return q, m, func(d time.Duration) { + nowLock.Lock() + now = now.Add(d) + nowLock.Unlock() + tick <- now + } +} + +func newQueue() (*priorityqueue[string], *fakeMetricsProvider) { metrics := newFakeMetricsProvider() q := New("test", func(o *Opts[string]) { o.MetricProvider = metrics @@ -710,7 +749,7 @@ func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { } return upstreamTick(d) } - return q, metrics + return q.(*priorityqueue[string]), metrics } type btreeInteractionValidator struct { diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index ea79681862..7dd06957eb 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -459,6 +459,9 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, // resource to be synced. log.V(5).Info("Reconciling") result, err := c.Reconcile(ctx, req) + if result.Priority != nil { + priority = *result.Priority + } switch { case err != nil: if errors.Is(err, reconcile.TerminalError(nil)) { @@ -468,8 +471,8 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, } ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() - if !result.IsZero() { - log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes requeuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler") + if result.RequeueAfter > 0 || result.Requeue { //nolint: staticcheck // We have to handle Requeue until it is removed + log.Info("Warning: Reconciler returned both a result with either RequeueAfter or Requeue set and a non-nil error. RequeueAfter and Requeue will always be ignored if the error is non-nil. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler") } log.Error(err, "Reconciler error") case result.RequeueAfter > 0: diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 306e0b0126..6d62b80e22 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -745,7 +745,36 @@ var _ = Describe("controller", func() { }})) }) - It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func(ctx SpecContext) { + It("should use the priority from Result when the reconciler requests a requeue", func(ctx SpecContext) { + q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return q + } + + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) + }() + + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request) + + By("Invoking Reconciler which will request a requeue") + fakeReconcile.AddResult(reconcile.Result{Requeue: true, Priority: ptr.To(99)}, nil) + Expect(<-reconciled).To(Equal(request)) + Eventually(func() []priorityQueueAddition { + q.lock.Lock() + defer q.lock.Unlock() + return q.added + }).Should(Equal([]priorityQueueAddition{{ + AddOpts: priorityqueue.AddOpts{ + RateLimited: true, + Priority: ptr.To(99), + }, + items: []reconcile.Request{request}, + }})) + }) + + It("should requeue a Request after a duration (but not rate-limited) if the Result sets RequeueAfter (regardless of Requeue)", func(ctx SpecContext) { dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)} ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { return dq @@ -775,7 +804,7 @@ var _ = Describe("controller", func() { Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) - It("should retain the priority with RequeAfter", func(ctx SpecContext) { + It("should retain the priority with RequeueAfter", func(ctx SpecContext) { q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")} ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { return q @@ -804,6 +833,35 @@ var _ = Describe("controller", func() { }})) }) + It("should use the priority from Result with RequeueAfter", func(ctx SpecContext) { + q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return q + } + + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) + }() + + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request) + + By("Invoking Reconciler which will ask for RequeueAfter") + fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100, Priority: ptr.To(99)}, nil) + Expect(<-reconciled).To(Equal(request)) + Eventually(func() []priorityQueueAddition { + q.lock.Lock() + defer q.lock.Unlock() + return q.added + }).Should(Equal([]priorityQueueAddition{{ + AddOpts: priorityqueue.AddOpts{ + After: time.Millisecond * 100, + Priority: ptr.To(99), + }, + items: []reconcile.Request{request}, + }})) + }) + It("should perform error behavior if error is not nil, regardless of RequeueAfter", func(ctx SpecContext) { dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)} ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { @@ -862,6 +920,35 @@ var _ = Describe("controller", func() { }})) }) + It("should use the priority from Result when there was an error", func(ctx SpecContext) { + q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return q + } + + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) + }() + + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: ptr.To(10)}, request) + + By("Invoking Reconciler which will return an error") + fakeReconcile.AddResult(reconcile.Result{Priority: ptr.To(99)}, errors.New("oups, I did it again")) + Expect(<-reconciled).To(Equal(request)) + Eventually(func() []priorityQueueAddition { + q.lock.Lock() + defer q.lock.Unlock() + return q.added + }).Should(Equal([]priorityQueueAddition{{ + AddOpts: priorityqueue.AddOpts{ + RateLimited: true, + Priority: ptr.To(99), + }, + items: []reconcile.Request{request}, + }})) + }) + PIt("should return if the queue is shutdown", func() { // TODO(community): write this test }) diff --git a/pkg/log/zap/flags.go b/pkg/log/zap/flags.go index 2c88ad42ab..4ebac57dcb 100644 --- a/pkg/log/zap/flags.go +++ b/pkg/log/zap/flags.go @@ -14,8 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package zap contains helpers for setting up a new logr.Logger instance -// using the Zap logging framework. package zap import ( diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index a9f91cbdd5..a2c3e5324d 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -446,13 +446,16 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { // Start the leader election and all required runnables. { - ctx, cancel := context.WithCancel(context.Background()) + // Create a context that inherits all keys from the parent context + // but can be cancelled independently for leader election management + baseCtx := context.WithoutCancel(ctx) + leaderCtx, cancel := context.WithCancel(baseCtx) cm.leaderElectionCancel = cancel if leaderElector != nil { // Start the leader elector process go func() { - leaderElector.Run(ctx) - <-ctx.Done() + leaderElector.Run(leaderCtx) + <-leaderCtx.Done() close(cm.leaderElectionStopped) }() } else { diff --git a/pkg/reconcile/reconcile.go b/pkg/reconcile/reconcile.go index c98b1864ef..88303ae781 100644 --- a/pkg/reconcile/reconcile.go +++ b/pkg/reconcile/reconcile.go @@ -44,6 +44,11 @@ type Result struct { // RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration. // Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter. RequeueAfter time.Duration + + // Priority is the priority that will be used if the item gets re-enqueued (also if an error is returned). + // If Priority is not set the original Priority of the request is preserved. + // Note: Priority is only respected if the controller is using a priorityqueue.PriorityQueue. + Priority *int } // IsZero returns true if this result is empty. @@ -174,7 +179,7 @@ type terminalError struct { err error } -// This function will return nil if te.err is nil. +// Unwrap returns nil if te.err is nil. func (te *terminalError) Unwrap() error { return te.err } diff --git a/tools/setup-envtest/go.mod b/tools/setup-envtest/go.mod index 5cb31d8bf2..15c64f8b57 100644 --- a/tools/setup-envtest/go.mod +++ b/tools/setup-envtest/go.mod @@ -10,7 +10,7 @@ require ( github.com/spf13/afero v1.12.0 github.com/spf13/pflag v1.0.6 go.uber.org/zap v1.27.0 - k8s.io/apimachinery v0.34.0 + k8s.io/apimachinery v0.34.1 sigs.k8s.io/yaml v1.6.0 ) diff --git a/tools/setup-envtest/go.sum b/tools/setup-envtest/go.sum index c9dcc6499b..dfc8e7cce2 100644 --- a/tools/setup-envtest/go.sum +++ b/tools/setup-envtest/go.sum @@ -46,7 +46,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/apimachinery v0.34.0 h1:eR1WO5fo0HyoQZt1wdISpFDffnWOvFLOOeJ7MgIv4z0= -k8s.io/apimachinery v0.34.0/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/apimachinery v0.34.1 h1:dTlxFls/eikpJxmAC7MVE8oOeP1zryV7iRyIjB0gky4= +k8s.io/apimachinery v0.34.1/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs= sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4=