diff --git a/clusterloader2/pkg/framework/client/objects.go b/clusterloader2/pkg/framework/client/objects.go index 7081b08105..9315048e07 100644 --- a/clusterloader2/pkg/framework/client/objects.go +++ b/clusterloader2/pkg/framework/client/objects.go @@ -251,12 +251,9 @@ func WaitForDeleteNamespace(c clientset.Interface, namespace string, timeout tim return wait.PollImmediate(defaultNamespaceDeletionInterval, timeout, retryWaitFunc) } -// ListEvents retrieves events for the object with the given name. -func ListEvents(c clientset.Interface, namespace string, name string, options ...*APICallOptions) (obj *apiv1.EventList, err error) { +func ListEventsWithOptions(c clientset.Interface, namespace string, listOptions metav1.ListOptions, options ...*APICallOptions) (obj *apiv1.EventList, err error) { getFunc := func() error { - obj, err = c.CoreV1().Events(namespace).List(context.TODO(), metav1.ListOptions{ - FieldSelector: "involvedObject.name=" + name, - }) + obj, err = c.CoreV1().Events(namespace).List(context.TODO(), listOptions) return err } if err := RetryWithExponentialBackOff(RetryFunction(getFunc, options...)); err != nil { @@ -265,6 +262,13 @@ func ListEvents(c clientset.Interface, namespace string, name string, options .. return obj, nil } +// ListEvents retrieves events for the object with the given name. +func ListEvents(c clientset.Interface, namespace string, name string, options ...*APICallOptions) (obj *apiv1.EventList, err error) { + return ListEventsWithOptions(c, namespace, metav1.ListOptions{ + FieldSelector: "involvedObject.name=" + name, + }, options...) +} + // DeleteStorageClass deletes storage class with given name. func DeleteStorageClass(c clientset.Interface, name string) error { deleteFunc := func() error { diff --git a/clusterloader2/pkg/framework/client/objects_test.go b/clusterloader2/pkg/framework/client/objects_test.go index 481225199b..e8ec0d13ec 100644 --- a/clusterloader2/pkg/framework/client/objects_test.go +++ b/clusterloader2/pkg/framework/client/objects_test.go @@ -17,13 +17,17 @@ limitations under the License. package client import ( + "context" "errors" "fmt" "testing" "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/fake" ) func TestIsResourceQuotaError(t *testing.T) { @@ -155,3 +159,36 @@ func TestKindPluralization(t *testing.T) { }) } } + +func TestListEventsWithOptions(t *testing.T) { + namespace := "default" + event1 := &corev1.Event{ + InvolvedObject: corev1.ObjectReference{ + Name: "object1", + Namespace: namespace, + }, + Message: "Event 1 message", + } + event2 := &corev1.Event{ + InvolvedObject: corev1.ObjectReference{ + Name: "object2", + Namespace: namespace, + }, + Message: "Event 2 message", + } + client := fake.NewSimpleClientset() + client.CoreV1().Events(namespace).Create(context.TODO(), event1, metav1.CreateOptions{}) + client.CoreV1().Events(namespace).Create(context.TODO(), event2, metav1.CreateOptions{}) + + events, err := ListEvents(client, namespace, "object1") + if err != nil { + t.Fatalf("Unexpected error from ListEvents()\n%v", err) + return + } + if len(events.Items) != 1 { + t.Fatalf("Expect 1 events, got %d", len(events.Items)) + } + if events.Items[0].InvolvedObject.Name != "object1" { + t.Errorf("Expect object1, got %q", events.Items[0].InvolvedObject.Name) + } +} diff --git a/clusterloader2/pkg/framework/framework.go b/clusterloader2/pkg/framework/framework.go index a87b348939..ef42deb95e 100644 --- a/clusterloader2/pkg/framework/framework.go +++ b/clusterloader2/pkg/framework/framework.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/perf-tests/clusterloader2/pkg/config" "k8s.io/perf-tests/clusterloader2/pkg/errors" @@ -95,6 +96,15 @@ func newFramework(clusterConfig *config.ClusterConfig, clientsNumber int, kubeCo return &f, nil } +func NewFakeFramework(fakeClient clientset.Interface) *Framework { + return &Framework{ + automanagedNamespaces: make(map[string]bool), + clientSets: &MultiClientSet{ + clients: []clientset.Interface{fakeClient}, + }, + } +} + // GetAutomanagedNamespacePrefix returns automanaged namespace prefix. func (f *Framework) GetAutomanagedNamespacePrefix() string { return f.automanagedNamespacePrefix diff --git a/clusterloader2/pkg/measurement/common/job_lifecycle_latency.go b/clusterloader2/pkg/measurement/common/job_lifecycle_latency.go index 4b290744dc..34d6f6c4c0 100644 --- a/clusterloader2/pkg/measurement/common/job_lifecycle_latency.go +++ b/clusterloader2/pkg/measurement/common/job_lifecycle_latency.go @@ -30,6 +30,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/perf-tests/clusterloader2/pkg/framework/client" "k8s.io/perf-tests/clusterloader2/pkg/measurement" measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util" "k8s.io/perf-tests/clusterloader2/pkg/measurement/util/informer" @@ -60,6 +61,8 @@ func createJobLifecycleLatencyMeasurement() measurement.Measurement { selector: util.NewObjectSelector(), jobStateEntries: measurementutil.NewObjectTransitionTimes(jobLifecycleLatencyMeasurementName), eventQueue: workqueue.New(), + podCreationTime: measurementutil.NewPodCreationEventTimes(), + eventTicker: time.NewTicker(time.Minute), } } @@ -69,6 +72,8 @@ type jobLifecycleLatencyMeasurement struct { stopCh chan struct{} eventQueue *workqueue.Type jobStateEntries *measurementutil.ObjectTransitionTimes + podCreationTime *measurementutil.PodCreationEventTimes + eventTicker *time.Ticker } // Execute supports two actions: @@ -130,6 +135,7 @@ func (p *jobLifecycleLatencyMeasurement) start(c clientset.Interface) error { p.addEvent, ) go p.processEvents() + go measurementutil.RunEveryTick(p.eventTicker, p.getFuncToListJobEvents(c), p.stopCh) return informer.StartAndSync(i, p.stopCh, informerSyncTimeout) } @@ -222,6 +228,12 @@ func (p *jobLifecycleLatencyMeasurement) gather(identifier string, timeout time. } p.stop() jobLifecycleLatency := p.jobStateEntries.CalculateTransitionsLatency(jobLifecycleTransitions, measurementutil.MatchAll) + jobCreationTimes := make(map[string]time.Time) + for jobName := range p.jobStateEntries.Keys() { + jobCreationTimes[jobName], _ = p.jobStateEntries.Get(jobName, jobCreated) + } + podCreationTime := p.podCreationTime.CalculateLatency(jobCreationTimes) + jobLifecycleLatency["create_to_pod_start"] = &podCreationTime content, jsonErr := util.PrettyPrintJSON(measurementutil.LatencyMapToPerfData(jobLifecycleLatency)) if jsonErr != nil { return nil, jsonErr @@ -234,3 +246,27 @@ func (p *jobLifecycleLatencyMeasurement) gather(identifier string, timeout time. func createMetaNamespaceKey(namespace, name string) string { return namespace + "/" + name } + +func (p *jobLifecycleLatencyMeasurement) getFuncToListJobEvents(c clientset.Interface) func() { + return func() { + klog.V(2).Infof("%s: list job events", p) + options := metav1.ListOptions{ + FieldSelector: "involvedObject.kind=Job", + } + events, err := client.ListEventsWithOptions(c, p.selector.Namespace, options) + if err != nil { + klog.Errorf("Failed to list events: %v", err) + return + } + for _, event := range events.Items { + key := createMetaNamespaceKey(event.InvolvedObject.Namespace, event.InvolvedObject.Name) + if !p.jobStateEntries.Exists(key) { + continue + } + if event.Reason != "SuccessfulCreate" { + continue + } + p.podCreationTime.Set(key, &event) + } + } +} diff --git a/clusterloader2/pkg/measurement/common/job_lifecycle_latency_test.go b/clusterloader2/pkg/measurement/common/job_lifecycle_latency_test.go new file mode 100644 index 0000000000..f01c93bf79 --- /dev/null +++ b/clusterloader2/pkg/measurement/common/job_lifecycle_latency_test.go @@ -0,0 +1,226 @@ +package common + +import ( + "context" + "encoding/json" + "testing" + "time" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/perf-tests/clusterloader2/pkg/framework" + "k8s.io/perf-tests/clusterloader2/pkg/measurement" +) + +func TestJobLifecycleLatencyMeasurement(t *testing.T) { + client := fake.NewSimpleClientset() + m := createJobLifecycleLatencyMeasurement().(*jobLifecycleLatencyMeasurement) + m.eventTicker.Reset(time.Second) + + startConfig := &measurement.Config{ + Params: map[string]interface{}{ + "action": "start", + }, + ClusterFramework: framework.NewFakeFramework(client), + } + if _, err := m.Execute(startConfig); err != nil { + t.Fatalf("Failed to start measurement: %v", err) + } + + start := time.Now() + namespace := "default" + jobs := []batchv1.Job{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "job0", + Namespace: namespace, + CreationTimestamp: metav1.Time{Time: start}, + }, + Status: batchv1.JobStatus{ + StartTime: &metav1.Time{Time: start.Add(1 * time.Minute)}, + CompletionTime: &metav1.Time{Time: start.Add(2 * time.Minute)}, + }, + }, + { + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: namespace, + CreationTimestamp: metav1.Time{Time: start.Add(30 * time.Second)}, + }, + Status: batchv1.JobStatus{ + StartTime: &metav1.Time{Time: start.Add(2 * time.Minute)}, + CompletionTime: &metav1.Time{Time: start.Add(4 * time.Minute)}, + }, + }, + } + for _, job := range jobs { + if _, err := client.BatchV1().Jobs(namespace).Create(context.TODO(), &job, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create job: %v", err) + } + } + + events := []corev1.Event{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Event", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "event0", + }, + InvolvedObject: corev1.ObjectReference{ + Name: "job0", + Namespace: namespace, + }, + Reason: "SuccessfulCreate", + Message: "Created pod: pod0", + FirstTimestamp: metav1.Time{Time: start.Add(2 * time.Minute)}, + LastTimestamp: metav1.Time{Time: start.Add(2 * time.Minute)}, + Count: 1, + }, + { + TypeMeta: metav1.TypeMeta{ + Kind: "Event", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "event1", + }, + InvolvedObject: corev1.ObjectReference{ + Name: "job1", + Namespace: namespace, + }, + Reason: "SuccessfulCreate", + Message: "Created pod: pod1", + FirstTimestamp: metav1.Time{Time: start.Add(5 * time.Minute)}, + LastTimestamp: metav1.Time{Time: start.Add(5 * time.Minute)}, + Count: 1, + }, + { + TypeMeta: metav1.TypeMeta{ + Kind: "Event", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "event2", + }, + InvolvedObject: corev1.ObjectReference{ + Name: "job1", + Namespace: namespace, + }, + Reason: "SuccessfulCreate", + Message: "(combined from similar events): Created pod: pod2", + FirstTimestamp: metav1.Time{Time: start.Add(time.Minute)}, + LastTimestamp: metav1.Time{Time: start.Add(6 * time.Minute)}, + Count: 11, + }, + } + for _, event := range events { + _, err := client.CoreV1().Events(namespace).Create(context.TODO(), &event, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create event: %v", err) + } + } + + time.Sleep(2 * time.Second) + + gatherConfig := &measurement.Config{ + Identifier: "test", + Params: map[string]interface{}{ + "action": "gather", + }, + ClusterFramework: framework.NewFakeFramework(client), + } + summaries, err := m.Execute(gatherConfig) + if err != nil { + t.Fatalf("Failed to gather measurement: %v", err) + } + + if len(summaries) != 1 { + t.Fatalf("Expect 1 summary, got %d", len(summaries)) + } + summary := summaries[0] + if summary.SummaryName() != "JobLifecycleLatency_test" { + t.Errorf("Unexpected summary name: %s", summary.SummaryName()) + } + var perfData map[string]interface{} + if err := json.Unmarshal([]byte(summary.SummaryContent()), &perfData); err != nil { + t.Fatalf("Failed to parse summary content as JSON: %v", err) + } + + { + createToPodStart := findDataItem(perfData, "create_to_pod_start") + if createToPodStart == nil { + t.Fatalf("Missing create_to_pod_start in summary") + } + gotP50 := getMetric(createToPodStart, "Perc50") + if gotP50 != 3*time.Minute { + t.Errorf("Expect create_to_pod_start Perc50 = 3 minutes, got %v", gotP50) + } + gotP90 := getMetric(createToPodStart, "Perc90") + if gotP90 != 5*time.Minute { + t.Errorf("Expect create_to_pod_start Perc90 = 5 minutes, got %v", gotP90) + } + gotP99 := getMetric(createToPodStart, "Perc99") + if gotP99 != 5*time.Minute+30*time.Second { + t.Errorf("Expect create_to_pod_start Perc99 = 5.5 minutes, got %v", gotP99) + } + } + { + createToStart := findDataItem(perfData, "create_to_start") + if createToStart == nil { + t.Fatalf("Missing create_to_start in summary") + } + gotP50 := getMetric(createToStart, "Perc50") + if gotP50 != time.Minute { + t.Errorf("Expect create_to_start Perc50 = 1 minute, got %v", gotP50) + } + gotP90 := getMetric(createToStart, "Perc90") + if gotP90 != time.Minute+30*time.Second { + t.Errorf("Expect create_to_start Perc90 = 1.5 minutes, got %v", gotP90) + } + gotP99 := getMetric(createToStart, "Perc99") + if gotP99 != time.Minute+30*time.Second { + t.Errorf("Expect create_to_start Perc99 = 1.5 minutes, got %v", gotP99) + } + } + { + startToComplete := findDataItem(perfData, "start_to_complete") + if startToComplete == nil { + t.Fatalf("Missing start_to_complete in summary") + } + gotP50 := getMetric(startToComplete, "Perc50") + if gotP50 != time.Minute { + t.Errorf("Expect start_to_complete Perc50 = 1 minutes, got %v", gotP50) + } + gotP90 := getMetric(startToComplete, "Perc90") + if gotP90 != 2*time.Minute { + t.Errorf("Expect start_to_complete Perc90 = 2 minutes, got %v", gotP90) + } + gotP99 := getMetric(startToComplete, "Perc99") + if gotP99 != 2*time.Minute { + t.Errorf("Expect start_to_complete Perc99 = 2 minutes, got %v", gotP99) + } + } +} + +func findDataItem(perfData map[string]interface{}, name string) map[string]interface{} { + dataItems := perfData["dataItems"].([]interface{}) + for _, item := range dataItems { + casted := item.(map[string]interface{}) + labels := casted["labels"].(map[string]interface{}) + if labels["Metric"] == name { + return casted + } + } + return nil +} + +func getMetric(dataItem map[string]interface{}, metric string) time.Duration { + return time.Duration(dataItem["data"].(map[string]interface{})[metric].(float64)) * time.Millisecond +} diff --git a/clusterloader2/pkg/measurement/util/cron.go b/clusterloader2/pkg/measurement/util/cron.go new file mode 100644 index 0000000000..44bde5676e --- /dev/null +++ b/clusterloader2/pkg/measurement/util/cron.go @@ -0,0 +1,34 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import "time" + +func RunEveryTick(ticker *time.Ticker, task func(), stop <-chan struct{}) { + defer func() { + ticker.Stop() + }() + + for { + select { + case <-stop: + return + case <-ticker.C: + task() + } + } +} diff --git a/clusterloader2/pkg/measurement/util/cron_test.go b/clusterloader2/pkg/measurement/util/cron_test.go new file mode 100644 index 0000000000..be319d9997 --- /dev/null +++ b/clusterloader2/pkg/measurement/util/cron_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "runtime" + "testing" + "time" +) + +type mockTicker struct { + C chan time.Time + now time.Time +} + +func newMockTicker() *mockTicker { + return &mockTicker{ + C: make(chan time.Time, 5), + now: time.Now(), + } +} + +func (t *mockTicker) tick() { + t.now = t.now.Add(time.Minute) + t.C <- t.now +} + +func TestRunEveryTick(t *testing.T) { + tests := []struct { + name string + stop bool + expected int + }{ + { + name: "WithoutStop", + stop: false, + expected: 2, + }, + { + name: "WithStop", + stop: true, + expected: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + counter := 0 + increase := func() { + counter++ + } + ticker := newMockTicker() + stop := make(chan struct{}) + go RunEveryTick(&time.Ticker{C: ticker.C}, increase, stop) + + ticker.tick() + runtime.Gosched() + + if tt.stop { + close(stop) + runtime.Gosched() + } + + ticker.tick() + runtime.Gosched() + + if counter != tt.expected { + t.Errorf("Expect counter to be %d, got %d", tt.expected, counter) + } + }) + } +} diff --git a/clusterloader2/pkg/measurement/util/phase_latency.go b/clusterloader2/pkg/measurement/util/phase_latency.go index 587b1f8e62..27acb72e2a 100644 --- a/clusterloader2/pkg/measurement/util/phase_latency.go +++ b/clusterloader2/pkg/measurement/util/phase_latency.go @@ -18,10 +18,13 @@ package util import ( "fmt" + "regexp" "sort" "sync" "time" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" ) @@ -84,6 +87,13 @@ func (o *ObjectTransitionTimes) Count(phase string) int { return count } +func (o *ObjectTransitionTimes) Exists(key string) bool { + o.lock.Lock() + defer o.lock.Unlock() + _, exists := o.times[key] + return exists +} + // KeyFilterFunc is a function that for a given key returns whether // its corresponding entry should be included in the metric. type KeyFilterFunc func(string) bool @@ -146,6 +156,19 @@ func (o *ObjectTransitionTimes) printLatencies(latencies []LatencyData, header s klog.V(0).Infof("%s: perc50: %v, perc90: %v, perc99: %v%s", o.name, metrics.Perc50, metrics.Perc90, metrics.Perc99, thresholdString) } +func (o *ObjectTransitionTimes) Keys() <-chan string { + ch := make(chan string, len(o.times)) + go func() { + o.lock.Lock() + defer o.lock.Unlock() + for key := range o.times { + ch <- key + } + close(ch) + }() + return ch +} + type latencyData struct { key string latency time.Duration @@ -167,3 +190,70 @@ func LatencyMapToPerfData(latency map[string]*LatencyMetric) *PerfData { } return perfData } + +type EventTimeAndCount struct { + firstTimestamp metav1.Time + lastTimestamp metav1.Time + count int32 +} + +type PodCreationEventTimes struct { + lock sync.Mutex + events map[string]map[string]EventTimeAndCount + podNameRegex *regexp.Regexp +} + +func NewPodCreationEventTimes() *PodCreationEventTimes { + return &PodCreationEventTimes{ + events: make(map[string]map[string]EventTimeAndCount), + podNameRegex: regexp.MustCompile(`Created pod: (.*)`), + } +} + +func (pc *PodCreationEventTimes) Set(key string, event *corev1.Event) { + match := pc.podNameRegex.FindStringSubmatch(event.Message) + pod_name := match[1] + + pc.lock.Lock() + defer pc.lock.Unlock() + if _, exists := pc.events[key]; !exists { + pc.events[key] = make(map[string]EventTimeAndCount) + } + pc.events[key][pod_name] = EventTimeAndCount{ + firstTimestamp: event.FirstTimestamp, + lastTimestamp: event.LastTimestamp, + count: event.Count, + } +} + +func (pc *PodCreationEventTimes) CalculateLatency(jobCreationTimes map[string]time.Time) LatencyMetric { + pc.lock.Lock() + defer pc.lock.Unlock() + + latencies := make([]LatencyData, 0, len(pc.events)) + for job_name, events := range pc.events { + jobCreateTime, exists := jobCreationTimes[job_name] + if !exists { + continue + } + for pod_name, event := range events { + latencies = append(latencies, latencyData{ + key: pod_name, + latency: event.firstTimestamp.Sub(jobCreateTime), + }) + if event.count == 1 { + continue + } + // Assume individual events in aggregated events distribute uniformly. + interval := event.lastTimestamp.Sub(event.firstTimestamp.Time) / time.Duration(event.count-1) + for i := 1; i < int(event.count); i++ { + latencies = append(latencies, latencyData{ + key: pod_name, + latency: event.firstTimestamp.Add(interval * time.Duration(i)).Sub(jobCreateTime), + }) + } + } + } + sort.Sort(LatencySlice(latencies)) + return NewLatencyMetric(latencies) +} diff --git a/clusterloader2/pkg/measurement/util/phase_latency_test.go b/clusterloader2/pkg/measurement/util/phase_latency_test.go new file mode 100644 index 0000000000..dd6af0c7ed --- /dev/null +++ b/clusterloader2/pkg/measurement/util/phase_latency_test.go @@ -0,0 +1,88 @@ +package util + +import ( + "fmt" + "sort" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestObjectTransitionTimesIterator(t *testing.T) { + ott := NewObjectTransitionTimes("test") + ott.Set("obj0", "phase1", time.Now()) + ott.Set("obj1", "phase1", time.Now()) + ott.Set("obj2", "phase1", time.Now()) + + var objects []string + for obj := range ott.Keys() { + objects = append(objects, obj) + } + + if len(objects) != 3 { + t.Errorf("expect 3 objects, got %d", len(objects)) + } + + sort.Strings(objects) + for i := 0; i < 3; i++ { + expected := fmt.Sprintf("obj%d", i) + if objects[i] != expected { + t.Errorf("expect %q at index %d, got %q", expected, i, objects[i]) + } + } +} + +func TestPodCreationEventTimes(t *testing.T) { + start := time.Now() + pcet := NewPodCreationEventTimes() + pcet.Set( + "default/job0", + &corev1.Event{ + InvolvedObject: corev1.ObjectReference{ + Name: "job0", + }, + Message: "Created pod: pod0", + FirstTimestamp: metav1.Time{Time: start.Add(2 * time.Minute)}, + LastTimestamp: metav1.Time{Time: start.Add(2 * time.Minute)}, + Count: 1, + }) + pcet.Set( + "default/job1", + &corev1.Event{ + InvolvedObject: corev1.ObjectReference{ + Name: "job1", + }, + Message: "Created pod: pod1", + FirstTimestamp: metav1.Time{Time: start.Add(5 * time.Minute)}, + LastTimestamp: metav1.Time{Time: start.Add(5 * time.Minute)}, + Count: 1, + }) + pcet.Set( + "default/job1", + &corev1.Event{ + InvolvedObject: corev1.ObjectReference{ + Name: "job1", + }, + Message: "(combined from similar events): Created pod: pod2", + FirstTimestamp: metav1.Time{Time: start.Add(time.Minute)}, + LastTimestamp: metav1.Time{Time: start.Add(6 * time.Minute)}, + Count: 11, + }) + + jobCreationTimes := map[string]time.Time{ + "default/job0": start, + "default/job1": start.Add(30 * time.Second), + } + latencyMetric := pcet.CalculateLatency(jobCreationTimes) + if latencyMetric.Perc50 != 3*time.Minute { + t.Errorf("expected Perc50 = 3 minutes, got %v", latencyMetric.Perc50) + } + if latencyMetric.Perc90 != 5*time.Minute { + t.Errorf("expected Perc90 = 5 minutes, got %v", latencyMetric.Perc90) + } + if latencyMetric.Perc99 != 5*time.Minute+30*time.Second { + t.Errorf("expected Perc99 = 5.5 minutes, got %v", latencyMetric.Perc99) + } +}