Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add a metric to track job creation to pod creation time.
  • Loading branch information
Lei Yao committed Jan 15, 2025
commit 86e2aafd909f080cb82e8efecbf046030e9d3a43
14 changes: 9 additions & 5 deletions clusterloader2/pkg/framework/client/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,9 @@ func WaitForDeleteNamespace(c clientset.Interface, namespace string, timeout tim
return wait.PollImmediate(defaultNamespaceDeletionInterval, timeout, retryWaitFunc)
}

// ListEvents retrieves events for the object with the given name.
func ListEvents(c clientset.Interface, namespace string, name string, options ...*APICallOptions) (obj *apiv1.EventList, err error) {
func ListEventsWithOptions(c clientset.Interface, namespace string, listOptions metav1.ListOptions, options ...*APICallOptions) (obj *apiv1.EventList, err error) {
getFunc := func() error {
obj, err = c.CoreV1().Events(namespace).List(context.TODO(), metav1.ListOptions{
FieldSelector: "involvedObject.name=" + name,
})
obj, err = c.CoreV1().Events(namespace).List(context.TODO(), listOptions)
return err
}
if err := RetryWithExponentialBackOff(RetryFunction(getFunc, options...)); err != nil {
Expand All @@ -265,6 +262,13 @@ func ListEvents(c clientset.Interface, namespace string, name string, options ..
return obj, nil
}

// ListEvents retrieves events for the object with the given name.
func ListEvents(c clientset.Interface, namespace string, name string, options ...*APICallOptions) (obj *apiv1.EventList, err error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change the order of these two functions, it will make resolve conflicts easier

return ListEventsWithOptions(c, namespace, metav1.ListOptions{
FieldSelector: "involvedObject.name=" + name,
}, options...)
}

// DeleteStorageClass deletes storage class with given name.
func DeleteStorageClass(c clientset.Interface, name string) error {
deleteFunc := func() error {
Expand Down
37 changes: 37 additions & 0 deletions clusterloader2/pkg/framework/client/objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ limitations under the License.
package client

import (
"context"
"errors"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/fake"
)

func TestIsResourceQuotaError(t *testing.T) {
Expand Down Expand Up @@ -155,3 +159,36 @@ func TestKindPluralization(t *testing.T) {
})
}
}

func TestListEventsWithOptions(t *testing.T) {
namespace := "default"
event1 := &corev1.Event{
InvolvedObject: corev1.ObjectReference{
Name: "object1",
Namespace: namespace,
},
Message: "Event 1 message",
}
event2 := &corev1.Event{
InvolvedObject: corev1.ObjectReference{
Name: "object2",
Namespace: namespace,
},
Message: "Event 2 message",
}
client := fake.NewSimpleClientset()
client.CoreV1().Events(namespace).Create(context.TODO(), event1, metav1.CreateOptions{})
client.CoreV1().Events(namespace).Create(context.TODO(), event2, metav1.CreateOptions{})

events, err := ListEvents(client, namespace, "object1")
if err != nil {
t.Fatalf("Unexpected error from ListEvents()\n%v", err)
return
}
if len(events.Items) != 1 {
t.Fatalf("Expect 1 events, got %d", len(events.Items))
}
if events.Items[0].InvolvedObject.Name != "object1" {
t.Errorf("Expect object1, got %q", events.Items[0].InvolvedObject.Name)
}
}
10 changes: 10 additions & 0 deletions clusterloader2/pkg/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/perf-tests/clusterloader2/pkg/config"
"k8s.io/perf-tests/clusterloader2/pkg/errors"
Expand Down Expand Up @@ -95,6 +96,15 @@ func newFramework(clusterConfig *config.ClusterConfig, clientsNumber int, kubeCo
return &f, nil
}

func NewFakeFramework(fakeClient clientset.Interface) *Framework {
return &Framework{
automanagedNamespaces: make(map[string]bool),
clientSets: &MultiClientSet{
clients: []clientset.Interface{fakeClient},
},
}
}

// GetAutomanagedNamespacePrefix returns automanaged namespace prefix.
func (f *Framework) GetAutomanagedNamespacePrefix() string {
return f.automanagedNamespacePrefix
Expand Down
36 changes: 36 additions & 0 deletions clusterloader2/pkg/measurement/common/job_lifecycle_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/perf-tests/clusterloader2/pkg/framework/client"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/informer"
Expand Down Expand Up @@ -60,6 +61,8 @@ func createJobLifecycleLatencyMeasurement() measurement.Measurement {
selector: util.NewObjectSelector(),
jobStateEntries: measurementutil.NewObjectTransitionTimes(jobLifecycleLatencyMeasurementName),
eventQueue: workqueue.New(),
podCreationTime: measurementutil.NewPodCreationEventTimes(),
eventTicker: time.NewTicker(time.Minute),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that the measurement will run very 1 minute to collect data right? I wonder if we should do second for more fine-grained result? Or allow users to customize the frequency with a variable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong, but the events are stored in ETCD for a while, every time we list, it will return all the events in its cache. As long as the interval is smaller than the life span the cache, it should not miss anything.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked to Anson, I'm working on a new version that uses informer of pods instead of ListEvents.

}
}

Expand All @@ -69,6 +72,8 @@ type jobLifecycleLatencyMeasurement struct {
stopCh chan struct{}
eventQueue *workqueue.Type
jobStateEntries *measurementutil.ObjectTransitionTimes
podCreationTime *measurementutil.PodCreationEventTimes
eventTicker *time.Ticker
}

// Execute supports two actions:
Expand Down Expand Up @@ -130,6 +135,7 @@ func (p *jobLifecycleLatencyMeasurement) start(c clientset.Interface) error {
p.addEvent,
)
go p.processEvents()
go measurementutil.RunEveryTick(p.eventTicker, p.getFuncToListJobEvents(c), p.stopCh)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or do we have to measure periodically? or it can be event driven

return informer.StartAndSync(i, p.stopCh, informerSyncTimeout)
}

Expand Down Expand Up @@ -222,6 +228,12 @@ func (p *jobLifecycleLatencyMeasurement) gather(identifier string, timeout time.
}
p.stop()
jobLifecycleLatency := p.jobStateEntries.CalculateTransitionsLatency(jobLifecycleTransitions, measurementutil.MatchAll)
jobCreationTimes := make(map[string]time.Time)
for jobName := range p.jobStateEntries.Keys() {
jobCreationTimes[jobName], _ = p.jobStateEntries.Get(jobName, jobCreated)
}
podCreationTime := p.podCreationTime.CalculateLatency(jobCreationTimes)
jobLifecycleLatency["create_to_pod_start"] = &podCreationTime
content, jsonErr := util.PrettyPrintJSON(measurementutil.LatencyMapToPerfData(jobLifecycleLatency))
if jsonErr != nil {
return nil, jsonErr
Expand All @@ -234,3 +246,27 @@ func (p *jobLifecycleLatencyMeasurement) gather(identifier string, timeout time.
func createMetaNamespaceKey(namespace, name string) string {
return namespace + "/" + name
}

func (p *jobLifecycleLatencyMeasurement) getFuncToListJobEvents(c clientset.Interface) func() {
return func() {
klog.V(2).Infof("%s: list job events", p)
options := metav1.ListOptions{
FieldSelector: "involvedObject.kind=Job",
}
events, err := client.ListEventsWithOptions(c, p.selector.Namespace, options)
if err != nil {
klog.Errorf("Failed to list events: %v", err)
return
}
for _, event := range events.Items {
key := createMetaNamespaceKey(event.InvolvedObject.Namespace, event.InvolvedObject.Name)
if !p.jobStateEntries.Exists(key) {
continue
}
if event.Reason != "SuccessfulCreate" {
continue
}
p.podCreationTime.Set(key, &event)
}
}
}
Loading