Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions pkg/framework/plugins/extender/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package extender

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"net/http"
"os"
)

// EvictionRequest represents the request body sent to the external extender.
type EvictionRequest struct {
Pod *v1.Pod `json:"pod"`
Node *v1.Node `json:"node"`
}

// EvictionResponse represents the response from the external extender.
type EvictionResponse struct {
Allow bool `json:"allow"`
Reason string `json:"reason,omitempty"`
Error string `json:"error,omitempty"`
}

// ExtenderClient
type ExtenderClient struct {
baseURL string // urlPrefix + decisionVerb
httpClient *http.Client
ignorable bool
failPolicy string
logger klog.Logger
}

// NewExtenderClient creates a new ExtenderClient from ExternalExtender config.
func NewExtenderClient(extenderConfig *ExternalExtender, logger klog.Logger) (*ExtenderClient, error) {
if extenderConfig == nil {
return nil, fmt.Errorf("extender config is nil")
}
if extenderConfig.URLPrefix == "" {
return nil, fmt.Errorf("urlPrefix is required")
}
if extenderConfig.DecisionVerb == "" {
return nil, fmt.Errorf("decisionVerb is required")
}
if extenderConfig.HTTPTimeout.Duration <= 0 {
return nil, fmt.Errorf("httpTimeout must be > 0")
}

baseURL := fmt.Sprintf("%s/%s", trimSuffix(extenderConfig.URLPrefix, "/"), extenderConfig.DecisionVerb)

var tlsConfig *tls.Config
if extenderConfig.EnableHTTPS {
var err error
tlsConfig, err = buildTLSConfig(extenderConfig.TLSConfig)
if err != nil {
return nil, fmt.Errorf("failed to build TLS config: %v", err)
}
}

httpClient := &http.Client{
Timeout: extenderConfig.HTTPTimeout.Duration,
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}

return &ExtenderClient{
baseURL: baseURL,
httpClient: httpClient,
ignorable: extenderConfig.Ignorable,
failPolicy: defaultIfEmpty(extenderConfig.FailPolicy, "Deny"),
logger: logger.WithValues("extender", baseURL),
}, nil
}

// DecideEviction calls the external service to decide whether to evict the pod.
func (c *ExtenderClient) DecideEviction(ctx context.Context, pod *v1.Pod, node *v1.Node) (bool, string, error) {
reqData := EvictionRequest{Pod: pod, Node: node}

body, err := json.Marshal(reqData)
if err != nil {
return false, "", fmt.Errorf("failed to marshal request: %v", err)
}

httpRequest, err := http.NewRequestWithContext(ctx, "POST", c.baseURL, bytes.NewReader(body))
if err != nil {
return false, "", fmt.Errorf("failed to create request: %v", err)
}
httpRequest.Header.Set("Content-Type", "application/json")

c.logger.V(4).Info("Calling external decision service", "url", c.baseURL, "pod", klog.KObj(pod))

resp, err := c.httpClient.Do(httpRequest)
if err != nil {
errMsg := fmt.Sprintf("HTTP request failed: %v", err)
c.logger.Error(err, "External decision service unreachable")
if c.ignorable {
c.logger.V(2).Info("ignorable=true, allowing eviction by default")
return true, "extender unreachable (ignorable)", nil
}
return false, "", fmt.Errorf("%s", errMsg)
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return false, "", fmt.Errorf("failed to read response body: %v", err)
}

if resp.StatusCode != http.StatusOK {
errMsg := fmt.Sprintf("non-200 status code: %d, body: %s", resp.StatusCode, string(respBody))
c.logger.Error(nil, "External decision service returned error", "statusCode", resp.StatusCode, "body", string(respBody))
if c.ignorable {
c.logger.V(2).Info("ignorable=true, allowing eviction by default")
return true, "extender error (ignorable)", nil
}
return false, "", fmt.Errorf("%s", errMsg)
}

var extResp EvictionResponse
if err := json.Unmarshal(respBody, &extResp); err != nil {
return false, "", fmt.Errorf("failed to unmarshal response: %v, body: %s", err, string(respBody))
}

if extResp.Error != "" {
c.logger.V(2).Info("External decision service returned logical error", "error", extResp.Error)
if c.ignorable {
return true, "extender logical error (ignorable)", nil
}
return false, extResp.Error, nil
}

c.logger.V(3).Info("External decision received", "allow", extResp.Allow, "reason", extResp.Reason)

return extResp.Allow, extResp.Reason, nil
}

func trimSuffix(s, suffix string) string {
if suffix == "" {
return s
}
if s == suffix {
return s
}
if len(s) > len(suffix) && s[len(s)-len(suffix):] == suffix {
return s[:len(s)-len(suffix)]
}
return s
}

func defaultIfEmpty(s, def string) string {
if s == "" {
return def
}
return s
}

func buildTLSConfig(tlsConfig *ExtenderTLSConfig) (*tls.Config, error) {
if tlsConfig == nil {
return &tls.Config{InsecureSkipVerify: false}, nil
}

config := &tls.Config{InsecureSkipVerify: false}

if tlsConfig.CACertFile != "" {
caCert, err := os.ReadFile(tlsConfig.CACertFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA cert file: %v", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse CA certificate")
}
config.RootCAs = caCertPool
}

return config, nil
}
27 changes: 27 additions & 0 deletions pkg/framework/plugins/extender/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package extender

import (
"k8s.io/apimachinery/pkg/runtime"
)

func addDefaultingFuncs(scheme *runtime.Scheme) error {
return RegisterDefaults(scheme)
}

// SetDefaults_ExternalDecisionArgs
// TODO: the final default values would be discussed in community
func SetDefaults_ExternalDecisionArgs(obj runtime.Object) {
}
16 changes: 16 additions & 0 deletions pkg/framework/plugins/extender/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// +k8s:defaulter-gen=TypeMeta

package extender
162 changes: 162 additions & 0 deletions pkg/framework/plugins/extender/extender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package extender

import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
)

const PluginName = "ExternalDecision"

// ExternalDecision
type ExternalDecision struct {
logger klog.Logger
handle frameworktypes.Handle
args *ExternalDecisionArgs
podFilter podutil.FilterFunc
client *ExtenderClient
}

func (d *ExternalDecision) Name() string {
return PluginName
}

var _ frameworktypes.DeschedulePlugin = &ExternalDecision{}

// New builds plugin from its arguments while passing a handle
func New(ctx context.Context, args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plugin, error) {
extArgs, ok := args.(*ExternalDecisionArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type RemoveFailedPodsArgs, got %T", args)
}

if extArgs.Extender == nil {
return nil, nil
}
if extArgs.Extender.URLPrefix == "" {
return nil, nil
}
if extArgs.Extender.HTTPTimeout.Duration <= 0 {
return nil, nil
}

if extArgs.Extender.FailPolicy == "" {
extArgs.Extender.FailPolicy = "Deny"
}

if extArgs.Extender == nil {
return nil, nil
}

client, err := NewExtenderClient(extArgs.Extender, klog.FromContext(ctx))
if err != nil {
return nil, nil
}

logger := klog.FromContext(ctx).WithValues("plugin", PluginName)

podFilter, err := podutil.NewOptions().
WithFilter(podutil.WrapFilterFuncs(handle.Evictor().Filter, handle.Evictor().PreEvictionFilter)).
BuildFilterFunc()
if err != nil {
return nil, fmt.Errorf("error initializing pod filter function: %v", err)
}

return &ExternalDecision{
logger: logger,
handle: handle,
podFilter: podFilter,
client: client,
}, nil
}

// Deschedule extension point implementation for the ExternalDecision plugin
func (d *ExternalDecision) Deschedule(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
podsToEvict := make([]*v1.Pod, 0)
nodeMap := make(map[string]*v1.Node, len(nodes))
logger := klog.FromContext(klog.NewContext(ctx, d.logger)).WithValues("ExtensionPoint", frameworktypes.DescheduleExtensionPoint)

// Build node map and collect candidate pods
for _, node := range nodes {
logger.V(2).Info("Processing node", "node", klog.KObj(node))
pods, err := podutil.ListAllPodsOnANode(node.Name, d.handle.GetPodsAssignedToNodeFunc(), d.podFilter)
if err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error listing pods on node %s: %v", node.Name, err),
}
}

nodeMap[node.Name] = node
podsToEvict = append(podsToEvict, pods...)
}

// Evaluate each pod
for i := range podsToEvict {
pod := podsToEvict[i]
node, exists := nodeMap[pod.Spec.NodeName]
if !exists {
logger.V(4).Info("Node not found in map, skipping pod", "pod", klog.KObj(pod), "node", pod.Spec.NodeName)
continue
}

shouldEvict, reason, err := d.evaluatePod(ctx, pod, node)
if err != nil {
d.handleError(logger, pod, err)
continue
}

if !shouldEvict {
logger.V(3).Info("Eviction denied by external decision", "pod", klog.KObj(pod), "reason", reason)
continue
}

loop:
for _, pod := range podsToEvict {
err := d.handle.Evictor().Evict(ctx, pod, evictions.EvictOptions{StrategyName: PluginName})
if err == nil {
continue
}
switch err.(type) {
case *evictions.EvictionNodeLimitError:
continue loop
case *evictions.EvictionTotalLimitError:
return nil
default:
logger.Error(err, "eviction failed")
}
}

}

return nil
}

// evaluatePod calls the external service to decide if a pod can be evicted.
func (d *ExternalDecision) evaluatePod(ctx context.Context, pod *v1.Pod, node *v1.Node) (bool, string, error) {
allow, reason, err := d.client.DecideEviction(ctx, pod, node)
if err != nil {
return false, "", fmt.Errorf("extender call failed: %v", err)
}
return allow, reason, nil
}

// handleError logs and handles errors from external decision service based on FailPolicy.
func (d *ExternalDecision) handleError(logger klog.Logger, pod *v1.Pod, err error) {
logger.Error(err, "External decision service error", "pod", klog.KObj(pod))

switch d.args.Extender.FailPolicy {
case "Allow":
logger.V(2).Info("failPolicy=Allow, proceeding with eviction", "pod", klog.KObj(pod))
case "Ignore":
logger.V(2).Info("failPolicy=Ignore, skipping pod", "pod", klog.KObj(pod))
case "Deny", "":
logger.V(2).Info("failPolicy=Deny, blocking eviction", "pod", klog.KObj(pod))
default:
logger.Error(nil, "Unknown failPolicy, defaulting to Deny", "failPolicy", d.args.Extender.FailPolicy, "pod", klog.KObj(pod))
}
}
Loading