From 016e108c1c4bb15dd8bf09e2ead321b6283c1a1b Mon Sep 17 00:00:00 2001 From: Daniel Gruber Date: Sun, 3 Nov 2024 20:07:00 +0100 Subject: [PATCH 1/4] EH: Watch qacct file; improved types --- cmd/simulator/go.sum | 4 +- examples/testexample/go.mod | 5 +- examples/testexample/go.sum | 2 + examples/testexample/testexample.go | 78 ++++++++++-- go.mod | 1 + go.sum | 2 + pkg/qacct/v9.0/file.go | 90 ++++++++++++++ pkg/qacct/v9.0/file_test.go | 66 ++++++++++ pkg/qacct/v9.0/parse.go | 83 ++++++++----- pkg/qacct/v9.0/parse_test.go | 50 ++++++-- pkg/qacct/v9.0/types.go | 101 ++++++++------- pkg/qstat/v9.0/parser.go | 5 + pkg/qstat/v9.0/parser_test.go | 184 ++++++++++++++++++++++++++++ pkg/qstat/v9.0/qstat_impl.go | 8 ++ 14 files changed, 579 insertions(+), 100 deletions(-) create mode 100644 pkg/qacct/v9.0/file.go create mode 100644 pkg/qacct/v9.0/file_test.go create mode 100644 pkg/qstat/v9.0/parser_test.go diff --git a/cmd/simulator/go.sum b/cmd/simulator/go.sum index e741c24..d04547d 100644 --- a/cmd/simulator/go.sum +++ b/cmd/simulator/go.sum @@ -22,8 +22,8 @@ golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= diff --git a/examples/testexample/go.mod b/examples/testexample/go.mod index cfd68d1..202c47e 100644 --- a/examples/testexample/go.mod +++ b/examples/testexample/go.mod @@ -10,4 +10,7 @@ require ( google.golang.org/protobuf v1.35.1 ) -require go.uber.org/multierr v1.10.0 // indirect +require ( + github.com/goccy/go-json v0.10.3 // indirect + go.uber.org/multierr v1.10.0 // indirect +) diff --git a/examples/testexample/go.sum b/examples/testexample/go.sum index 64ed0e3..dff5629 100644 --- a/examples/testexample/go.sum +++ b/examples/testexample/go.sum @@ -4,6 +4,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= diff --git a/examples/testexample/testexample.go b/examples/testexample/testexample.go index 6dd8fcf..a6cb8f5 100644 --- a/examples/testexample/testexample.go +++ b/examples/testexample/testexample.go @@ -17,21 +17,34 @@ import ( var qacctClient qacct.QAcct var qstatClient qstat.QStat +var newlyFinishedJobs <-chan qacct.JobDetail + var log *zap.Logger func init() { var err error log, _ = zap.NewProduction() + + qstatClient, err = qstat.NewCommandLineQstat(qstat.CommandLineQStatConfig{}) + if err != nil { + log.Fatal("Failed to initialize qstat client", zap.String("error", + err.Error())) + } + qacctClient, err = qacct.NewCommandLineQAcct(qacct.CommandLineQAcctConfig{}) if err != nil { log.Fatal("Failed to initialize qacct client", zap.String("error", err.Error())) } - qstatClient, err = qstat.NewCommandLineQstat(qstat.CommandLineQStatConfig{}) + + // watch for newly finished jobs + newlyFinishedJobs, err = qacct.WatchFile(context.Background(), + qacct.GetDefaultQacctFile(), 1024) if err != nil { - log.Fatal("Failed to initialize qstat client", zap.String("error", - err.Error())) + log.Fatal("Failed to initialize job watcher", + zap.String("error", err.Error())) } + } func main() { @@ -48,7 +61,7 @@ func run(ctx context.Context) { log.Info("Context cancelled, stopping ClusterScheduler") return default: - finishedJobs, err := GetFinishedJobs() + finishedJobs, err := GetFinishedJobsWithWatcher() if err != nil { log.Error("Error getting finished jobs", zap.String("error", err.Error())) @@ -107,10 +120,47 @@ type SimpleJob struct { MasterNode string `json:"master_node"` } +func GetFinishedJobsWithWatcher() ([]*SimpleJob, error) { + jobs := []*SimpleJob{} + + for { + // get next job or timeout after 0.1s of there is no new job + select { + case fjob := <-newlyFinishedJobs: + state := fmt.Sprintf("%d", fjob.ExitStatus) + if state == "0" { + state = "done" + } else { + state = "failed" + } + simpleJob := SimpleJob{ + // ignore job arrays for now + JobId: fmt.Sprintf("%d", fjob.JobNumber), + Cluster: fjob.QName, + JobName: fjob.JobName, + Partition: fjob.GrantedPE, + Account: fjob.Account, + User: fjob.Owner, + State: state, + ExitCode: fmt.Sprintf("%d", fjob.ExitStatus), + Submit: parseTimestampInt64(fjob.SubmitTime), + Start: parseTimestampInt64(fjob.StartTime), + End: parseTimestampInt64(fjob.EndTime), + MasterNode: fjob.HostName, + } + jobs = append(jobs, &simpleJob) + case <-time.After(100 * time.Millisecond): + return jobs, nil + } + } + return jobs, nil +} + func GetFinishedJobs() ([]*SimpleJob, error) { // Use qacct NativeSpecification to get finished jobs qacctOutput, err := qacctClient.NativeSpecification([]string{"-j", "*"}) if err != nil { + // no job are command failed return nil, fmt.Errorf("error running qacct command: %v", err) } @@ -137,9 +187,9 @@ func GetFinishedJobs() ([]*SimpleJob, error) { User: job.Owner, State: state, ExitCode: fmt.Sprintf("%d", job.ExitStatus), - Submit: parseTimestamp(job.QSubTime), - Start: parseTimestamp(job.StartTime), - End: parseTimestamp(job.EndTime), + Submit: parseTimestampInt64(job.SubmitTime), + Start: parseTimestampInt64(job.StartTime), + End: parseTimestampInt64(job.EndTime), MasterNode: job.HostName, } } @@ -150,7 +200,8 @@ func GetRunningJobs() ([]*SimpleJob, error) { qstatOverview, err := qstatClient.NativeSpecification([]string{"-g", "t"}) if err != nil { - return nil, fmt.Errorf("error running qstat command: %v", err) + // no jobs running + return nil, nil } jobsByTask, err := qstat.ParseGroupByTask(qstatOverview) if err != nil { @@ -193,7 +244,8 @@ func GetRunningJobs() ([]*SimpleJob, error) { // get running jobs qstatOutput, err := qstatClient.NativeSpecification([]string{"-j", "*"}) if err != nil { - return nil, fmt.Errorf("error running qstat command: %v", err) + // no jobs running; qstat -j * found 0 jobs (TODO) + return nil, nil } jobs, err := qstat.ParseSchedulerJobInfo(qstatOutput) @@ -242,6 +294,14 @@ func SendJobs(ctx context.Context, jobs []*SimpleJob) (int, error) { return len(jobs), nil } +func parseTimestampInt64(ts int64) *timestamppb.Timestamp { + // ts is 6 digits behind the second (microseconds) + sec := ts / 1e6 + nsec := (ts - sec*1e6) * 1e3 + t := time.Unix(sec, nsec) + return timestamppb.New(t) +} + // 2024-10-24 09:49:59.911136 func parseTimestamp(s string) *timestamppb.Timestamp { loc, err := time.LoadLocation("Local") diff --git a/go.mod b/go.mod index 2ed5b84..4165b8c 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/hpc-gridware/go-clusterscheduler go 1.22.4 require ( + github.com/goccy/go-json v0.10.3 github.com/onsi/ginkgo/v2 v2.19.1 github.com/onsi/gomega v1.34.1 go.opentelemetry.io/contrib/bridges/otelslog v0.5.0 diff --git a/go.sum b/go.sum index daec351..1f70016 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= diff --git a/pkg/qacct/v9.0/file.go b/pkg/qacct/v9.0/file.go new file mode 100644 index 0000000..4a3512a --- /dev/null +++ b/pkg/qacct/v9.0/file.go @@ -0,0 +1,90 @@ +package qacct + +import ( + "bufio" + "context" + "fmt" + "io" + "log" + "os" + "path/filepath" + "time" + + "github.com/goccy/go-json" +) + +// DefaultQacctFile returns the path to the default accounting file based +// on the SGE_ROOT and SGE_CELL environment variables. +func GetDefaultQacctFile() string { + sgeRoot := os.Getenv("SGE_ROOT") + sgeCell := os.Getenv("SGE_CELL") + return filepath.Join(sgeRoot, sgeCell, "common", "accounting.jsonl") +} + +// WatchFile returns a channel that emits all JobDetail objects from the accounting +// file. It continues to emit JobDetail objects as new lines are added to the file. +// The channel is buffered with the given buffer size. +func WatchFile(ctx context.Context, path string, bufferSize int) (<-chan JobDetail, error) { + if path == "" { + path = GetDefaultQacctFile() + } + + file, err := os.OpenFile(path, os.O_RDONLY, 0) + if err != nil { + return nil, fmt.Errorf("failed to open file: %v", err) + } + + jobDetailsChan := make(chan JobDetail, bufferSize) + + // offset points to the last processed line + var offset int64 = 0 + + go func() { + defer file.Close() + defer close(jobDetailsChan) + + scanner := bufio.NewScanner(file) + + for { + if _, err := file.Seek(offset, io.SeekStart); err != nil { + log.Printf("failed to seek to file end: %v", err) + return + } + + for scanner.Scan() { + var job JobDetail + line := scanner.Text() + // TODO parsing can be done in parallel + err := json.Unmarshal([]byte(line), &job) + if err != nil { + log.Printf("failed to unmarshal line: %v", err) + continue + } + jobDetailsChan <- job + } + + if err := scanner.Err(); err != nil { + log.Printf("JSONL parsing error: %v", err) + return + } + + // store processed offset + offset, err = file.Seek(0, io.SeekCurrent) + if err != nil { + log.Printf("failed to get current offset: %v", err) + return + } + + // wait a little before re-scanning for new data and reset scanner + select { + case <-ctx.Done(): + return + default: + <-time.After(1 * time.Second) + scanner = bufio.NewScanner(file) + } + } + }() + + return jobDetailsChan, nil +} diff --git a/pkg/qacct/v9.0/file_test.go b/pkg/qacct/v9.0/file_test.go new file mode 100644 index 0000000..d223804 --- /dev/null +++ b/pkg/qacct/v9.0/file_test.go @@ -0,0 +1,66 @@ +package qacct_test + +import ( + "context" + "fmt" + "log" + "slices" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + qacct "github.com/hpc-gridware/go-clusterscheduler/pkg/qacct/v9.0" + qsub "github.com/hpc-gridware/go-clusterscheduler/pkg/qsub/v9.0" +) + +var _ = Describe("File", func() { + + Context("WatchFile", func() { + + It("returns an error when the file does not exist", func() { + _, err := qacct.WatchFile(context.Background(), + "nonexistentfile.txt", 10) + Expect(err).To(HaveOccurred()) + }) + + It("returns a channel that emits JobDetail objects for 10 jobs", func() { + + qs, err := qsub.NewCommandLineQSub(qsub.CommandLineQSubConfig{}) + Expect(err).NotTo(HaveOccurred()) + + jobIDs := make([]int, 10) + for i := 0; i < 10; i++ { + jobID, _, err := qs.Submit(context.Background(), qsub.JobOptions{ + Command: "echo", + CommandArgs: []string{fmt.Sprintf("job %d", i+1)}, + Binary: qsub.ToPtr(true), + }) + Expect(err).NotTo(HaveOccurred()) + log.Printf("jobID: %d", jobID) + jobIDs[i] = int(jobID) + } + + jobDetailsChan, err := qacct.WatchFile(context.Background(), + qacct.GetDefaultQacctFile(), 0) + Expect(err).NotTo(HaveOccurred()) + Expect(jobDetailsChan).NotTo(BeNil()) + + receivedJobs := make(map[int]bool) + Eventually(func() bool { + select { + case jd := <-jobDetailsChan: + log.Printf("job: %+v", jd.JobNumber) + // check if jobID is in the jobIDs list + if slices.Contains(jobIDs, int(jd.JobNumber)) { + Expect(jd.SubmitCommandLine).To(ContainSubstring("echo 'job")) + Expect(jd.JobUsage.Usage.Memory).To(BeNumerically(">=", 0)) + receivedJobs[int(jd.JobNumber)] = true + } + default: + return len(receivedJobs) == 10 + } + return false + }, "10s").Should(BeTrue()) + }) + }) +}) diff --git a/pkg/qacct/v9.0/parse.go b/pkg/qacct/v9.0/parse.go index d14c8c5..673d3bd 100644 --- a/pkg/qacct/v9.0/parse.go +++ b/pkg/qacct/v9.0/parse.go @@ -21,8 +21,10 @@ package qacct import ( "bufio" + "encoding/json" "strconv" "strings" + "time" ) func ParseQacctJobOutputWithScanner(scanner *bufio.Scanner) ([]JobDetail, error) { @@ -73,13 +75,13 @@ func ParseQacctJobOutputWithScanner(scanner *bufio.Scanner) ([]JobDetail, error) case "priority": job.Priority = parseInt64(value) case "qsub_time": - job.QSubTime = value + job.SubmitTime = parseTime(value) case "submit_cmd_line": job.SubmitCommandLine = value case "start_time": - job.StartTime = value + job.StartTime = parseTime(value) case "end_time": - job.EndTime = value + job.EndTime = parseTime(value) case "granted_pe": job.GrantedPE = value case "slots": @@ -89,55 +91,53 @@ func ParseQacctJobOutputWithScanner(scanner *bufio.Scanner) ([]JobDetail, error) case "exit_status": job.ExitStatus = parseInt64(value) case "ru_wallclock": - job.RuWallClock = parseFloat(value) + job.JobUsage.RUsage.RuWallclock = parseInt64(value) case "ru_utime": - job.RuUTime = parseFloat(value) + job.JobUsage.RUsage.RuUtime = parseFloat(value) case "ru_stime": - job.RuSTime = parseFloat(value) + job.JobUsage.RUsage.RuStime = parseFloat(value) case "ru_maxrss": - job.RuMaxRSS = parseInt64(value) + job.JobUsage.RUsage.RuMaxrss = parseInt64(value) case "ru_ixrss": - job.RuIXRSS = parseInt64(value) + job.JobUsage.RUsage.RuIxrss = parseInt64(value) case "ru_ismrss": - job.RuISMRSS = parseInt64(value) + job.JobUsage.RUsage.RuIsmrss = parseInt64(value) case "ru_idrss": - job.RuIDRSS = parseInt64(value) + job.JobUsage.RUsage.RuIdrss = parseInt64(value) case "ru_isrss": - job.RuISRss = parseInt64(value) + job.JobUsage.RUsage.RuIsrss = parseInt64(value) case "ru_minflt": - job.RuMinFlt = parseInt64(value) + job.JobUsage.RUsage.RuMinflt = parseInt64(value) case "ru_majflt": - job.RuMajFlt = parseInt64(value) + job.JobUsage.RUsage.RuMajflt = parseInt64(value) case "ru_nswap": - job.RuNSwap = parseInt64(value) + job.JobUsage.RUsage.RuNswap = parseInt64(value) case "ru_inblock": - job.RuInBlock = parseInt64(value) + job.JobUsage.RUsage.RuInblock = parseInt64(value) case "ru_oublock": - job.RuOuBlock = parseInt64(value) + job.JobUsage.RUsage.RuOublock = parseInt64(value) case "ru_msgsnd": - job.RuMsgSend = parseInt64(value) + job.JobUsage.RUsage.RuMsgsnd = parseInt64(value) case "ru_msgrcv": - job.RuMsgRcv = parseInt64(value) + job.JobUsage.RUsage.RuMsgrcv = parseInt64(value) case "ru_nsignals": - job.RuNSignals = parseInt64(value) + job.JobUsage.RUsage.RuNsignals = parseInt64(value) case "ru_nvcsw": - job.RuNVCSw = parseInt64(value) - case "ru_nivcsw": - job.RuNiVCSw = parseInt64(value) + job.JobUsage.RUsage.RuNvcsw = parseInt64(value) case "wallclock": - job.WallClock = parseFloat(value) + job.JobUsage.Usage.WallClock = parseFloat(value) case "cpu": - job.CPU = parseFloat(value) + job.JobUsage.Usage.CPU = parseFloat(value) case "mem": - job.Memory = parseFloat(value) + job.JobUsage.Usage.Memory = parseFloat(value) case "io": - job.IO = parseFloat(value) + job.JobUsage.Usage.IO = parseFloat(value) case "iow": - job.IOWait = parseFloat(value) + job.JobUsage.Usage.IOWait = parseFloat(value) case "maxvmem": - job.MaxVMem = parseInt64(value) + job.JobUsage.Usage.MaxVMem = parseFloat(value) case "maxrss": - job.MaxRSS = parseInt64(value) + job.JobUsage.Usage.MaxRSS = parseFloat(value) case "arid": job.ArID = value } @@ -161,6 +161,22 @@ func ParseQAcctJobOutput(output string) ([]JobDetail, error) { return jobs, nil } +/* +qsub_time 2024-09-27 07:41:44.421951 +submit_cmd_line qsub -b y -t 1-100:2 sleep 0 +start_time 2024-09-27 07:42:07.265733 +end_time 2024-09-27 07:42:08.796845 +*/ +func parseTime(value string) int64 { + // fix layout to match the output: "2024-09-27 07:42:08.796845" + layout := "2006-01-02 15:04:05.999999" // Correct layout for the given examples + t, err := time.Parse(layout, value) + if err != nil { + return 0 + } + return t.UnixNano() / 1000 +} + func parseInt(value string) int { i, _ := strconv.Atoi(value) return i @@ -175,3 +191,12 @@ func parseFloat(value string) float64 { f, _ := strconv.ParseFloat(value, 64) return f } + +func ParseAccountingJSONLine(line string) (JobDetail, error) { + var job JobDetail + err := json.Unmarshal([]byte(line), &job) + if err != nil { + return JobDetail{}, err + } + return job, nil +} diff --git a/pkg/qacct/v9.0/parse_test.go b/pkg/qacct/v9.0/parse_test.go index 2f2dec9..a78e5bc 100644 --- a/pkg/qacct/v9.0/parse_test.go +++ b/pkg/qacct/v9.0/parse_test.go @@ -148,26 +148,26 @@ Total System Usage Expect(job1.JobNumber).To(Equal(int64(8))) Expect(job1.TaskID).To(Equal(int64(97))) Expect(job1.Account).To(Equal("sge")) - Expect(job1.QSubTime).To(Equal("2024-09-27 07:41:44.421951")) - Expect(job1.StartTime).To(Equal("2024-09-27 07:42:07.272221")) - Expect(job1.EndTime).To(Equal("2024-09-27 07:42:08.801865")) + Expect(job1.SubmitTime).To(Equal(int64(1727422904421951))) + Expect(job1.StartTime).To(Equal(int64(1727422927272221))) + Expect(job1.EndTime).To(Equal(int64(1727422928801865))) Expect(job1.Failed).To(Equal(int64(0))) Expect(job1.ExitStatus).To(Equal(int64(0))) - Expect(job1.RuWallClock).To(Equal(1.0)) - Expect(job1.RuUTime).To(Equal(0.492)) - Expect(job1.RuSTime).To(Equal(0.234)) - Expect(job1.RuMaxRSS).To(Equal(int64(10300))) - Expect(job1.MaxVMem).To(Equal(int64(21045248))) - Expect(job1.MaxRSS).To(Equal(int64(10547200))) + Expect(job1.JobUsage.Usage.WallClock).To(Equal(3.487)) + Expect(job1.JobUsage.RUsage.RuUtime).To(Equal(0.492)) + Expect(job1.JobUsage.RUsage.RuStime).To(Equal(0.234)) + Expect(job1.JobUsage.RUsage.RuMaxrss).To(Equal(int64(10300))) + Expect(job1.JobUsage.Usage.MaxVMem).To(Equal(float64(21045248))) + Expect(job1.JobUsage.Usage.MaxRSS).To(Equal(float64(10547200))) job2 := jobs[1] Expect(job2.QName).To(Equal("all.q")) Expect(job2.HostName).To(Equal("master")) Expect(job2.JobNumber).To(Equal(int64(8))) Expect(job2.TaskID).To(Equal(int64(99))) - Expect(job2.QSubTime).To(Equal("2024-09-27 07:41:44.421951")) - Expect(job2.StartTime).To(Equal("2024-09-27 07:42:07.265733")) - Expect(job2.EndTime).To(Equal("2024-09-27 07:42:08.796845")) + Expect(job2.SubmitTime).To(Equal(int64(1727422904421951))) + Expect(job2.StartTime).To(Equal(int64(1727422927265733))) + Expect(job2.EndTime).To(Equal(int64(1727422928796845))) }) It("should handle empty input", func() { @@ -188,4 +188,30 @@ Total System Usage }) }) + + Context("Raw JSON", func() { + + sampleOutput := `{"job_number":10,"task_number":1,"start_time":1730532913429415,"end_time":1730532913979016,"owner":"root","group":"root","account":"sge","qname":"all.q","hostname":"master","department":"defaultdepartment","slots":1,"job_name":"echo","priority":0,"submission_time":1730532912874519,"submit_cmd_line":"qsub -b y -terse echo 'job 1'","category":"","failed":0,"exit_status":0,"usage":{"rusage":{"ru_wallclock":0,"ru_utime":0.355821,"ru_stime":0.161309,"ru_maxrss":10284,"ru_ixrss":0,"ru_ismrss":0,"ru_idrss":0,"ru_isrss":0,"ru_minflt":504,"ru_majflt":0,"ru_nswap":0,"ru_inblock":0,"ru_oublock":11,"ru_msgsnd":0,"ru_msgrcv":0,"ru_nsignals":0,"ru_nvcsw":248,"ru_nivcsw":14},"usage":{"wallclock":2.022342,"cpu":0.51713,"mem":0.0043125152587890625,"io":0.000008341856300830841,"iow":0.0,"maxvmem":21049344.0,"maxrss":10530816.0}}}` + + It("should parse raw JSON correctly", func() { + job, err := qacct.ParseAccountingJSONLine(sampleOutput) + Expect(err).To(BeNil()) + Expect(job).NotTo(BeNil()) + Expect(job.JobNumber).To(Equal(int64(10))) + Expect(job.TaskID).To(Equal(int64(1))) + Expect(job.StartTime).To(Equal(int64(1730532913429415))) + Expect(job.EndTime).To(Equal(int64(1730532913979016))) + Expect(job.SubmitTime).To(Equal(int64(1730532912874519))) + Expect(job.SubmitCommandLine).To(Equal("qsub -b y -terse echo 'job 1'")) + Expect(job.JobName).To(Equal("echo")) + Expect(job.Account).To(Equal("sge")) + Expect(job.Priority).To(Equal(int64(0))) + Expect(job.Failed).To(Equal(int64(0))) + Expect(job.ExitStatus).To(Equal(int64(0))) + Expect(job.JobUsage.RUsage.RuWallclock).To(Equal(int64(0))) + Expect(job.JobUsage.RUsage.RuUtime).To(Equal(float64(0.355821))) + Expect(job.JobUsage.RUsage.RuStime).To(Equal(float64(0.161309))) + Expect(job.JobUsage.RUsage.RuMaxrss).To(Equal(int64(10284))) + }) + }) }) diff --git a/pkg/qacct/v9.0/types.go b/pkg/qacct/v9.0/types.go index 1556331..0260783 100644 --- a/pkg/qacct/v9.0/types.go +++ b/pkg/qacct/v9.0/types.go @@ -85,53 +85,58 @@ type TaskUsage struct { JobDetail JobDetail } +// sampleOutput := `{"job_number":10,"task_number":1,"start_time":1730532913429415,"end_time":1730532913979016,"owner":"root","group":"root","account":"sge","qname":"all.q","hostname":"master","department":"defaultdepartment","slots":1,"job_name":"echo","priority":0,"submission_time":1730532912874519,"submit_cmd_line":"qsub -b y -terse echo 'job 1'","category":"","failed":0,"exit_status":0,"usage":{"rusage":{"ru_wallclock":0,"ru_utime":0.355821,"ru_stime":0.161309,"ru_maxrss":10284,"ru_ixrss":0,"ru_ismrss":0,"ru_idrss":0,"ru_isrss":0,"ru_minflt":504,"ru_majflt":0,"ru_nswap":0,"ru_inblock":0,"ru_oublock":11,"ru_msgsnd":0,"ru_msgrcv":0,"ru_nsignals":0,"ru_nvcsw":248,"ru_nivcsw":14},"usage":{"wallclock":2.022342,"cpu":0.51713,"mem":0.0043125152587890625,"io":0.000008341856300830841,"iow":0.0,"maxvmem":21049344.0,"maxrss":10530816.0}}}` + type JobDetail struct { - QName string `json:"qname"` - HostName string `json:"hostname"` - Group string `json:"group"` - Owner string `json:"owner"` - Project string `json:"project"` - Department string `json:"department"` - JobName string `json:"jobname"` - JobNumber int64 `json:"jobnumber"` - TaskID int64 `json:"taskid"` - PETaskID string `json:"pe_taskid"` - Account string `json:"account"` - Priority int64 `json:"priority"` - QSubTime string `json:"qsub_time"` - SubmitCommandLine string `json:"submit_command_line"` - StartTime string `json:"start_time"` - EndTime string `json:"end_time"` - GrantedPE string `json:"granted_pe"` - Slots int64 `json:"slots"` - Failed int64 `json:"failed"` - ExitStatus int64 `json:"exit_status"` - RuWallClock float64 `json:"ru_wallclock"` - RuUTime float64 `json:"ru_utime"` - RuSTime float64 `json:"ru_stime"` - RuMaxRSS int64 `json:"ru_maxrss"` - RuIXRSS int64 `json:"ru_ixrss"` - RuISMRSS int64 `json:"ru_ismrss"` - RuIDRSS int64 `json:"ru_idrss"` - RuISRss int64 `json:"ru_isrss"` - RuMinFlt int64 `json:"ru_minflt"` - RuMajFlt int64 `json:"ru_majflt"` - RuNSwap int64 `json:"ru_nswap"` - RuInBlock int64 `json:"ru_inblock"` - RuOuBlock int64 `json:"ru_oublock"` - RuMsgSend int64 `json:"ru_msgsnd"` - RuMsgRcv int64 `json:"ru_msgrcv"` - RuNSignals int64 `json:"ru_nsignals"` - RuNVCSw int64 `json:"ru_nvcsw"` - RuNiVCSw int64 `json:"ru_nivcsw"` - WallClock float64 `json:"wallclock"` - CPU float64 `json:"cpu"` - Memory float64 `json:"mem"` - IO float64 `json:"io"` - IOWait float64 `json:"iow"` - MaxVMem int64 `json:"maxvmem"` - MaxRSS int64 `json:"maxrss"` - ArID string `json:"arid"` + QName string `json:"qname"` + HostName string `json:"hostname"` + Group string `json:"group"` + Owner string `json:"owner"` + Project string `json:"project"` + Department string `json:"department"` + JobName string `json:"job_name"` + JobNumber int64 `json:"job_number"` + TaskID int64 `json:"task_number"` + PETaskID string `json:"pe_taskid"` + Account string `json:"account"` + Priority int64 `json:"priority"` + SubmitTime int64 `json:"submission_time"` + SubmitCommandLine string `json:"submit_cmd_line"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + GrantedPE string `json:"granted_pe"` + Slots int64 `json:"slots"` + Failed int64 `json:"failed"` + ExitStatus int64 `json:"exit_status"` + ArID string `json:"arid"` + JobUsage JobUsage `json:"usage"` +} + +type JobUsage struct { + Usage Usage `json:"usage"` + RUsage RUsage `json:"rusage"` +} + +// RUsage represents the resource usage data structure. +type RUsage struct { + RuWallclock int64 `json:"ru_wallclock"` + RuUtime float64 `json:"ru_utime"` + RuStime float64 `json:"ru_stime"` + RuMaxrss int64 `json:"ru_maxrss"` + RuIxrss int64 `json:"ru_ixrss"` + RuIsmrss int64 `json:"ru_ismrss"` + RuIdrss int64 `json:"ru_idrss"` + RuIsrss int64 `json:"ru_isrss"` + RuMinflt int64 `json:"ru_minflt"` + RuMajflt int64 `json:"ru_majflt"` + RuNswap int64 `json:"ru_nswap"` + RuInblock int64 `json:"ru_inblock"` + RuOublock int64 `json:"ru_oublock"` + RuMsgsnd int64 `json:"ru_msgsnd"` + RuMsgrcv int64 `json:"ru_msgrcv"` + RuNsignals int64 `json:"ru_nsignals"` + RuNvcsw int64 `json:"ru_nvcsw"` + RuNivcsw int64 `json:"ru_nivcsw"` } type PeUsage struct { @@ -144,7 +149,9 @@ type Usage struct { UserTime float64 `json:"utime"` SystemTime float64 `json:"stime"` CPU float64 `json:"cpu"` - Memory float64 `json:"memory"` + Memory float64 `json:"mem"` IO float64 `json:"io"` IOWait float64 `json:"iow"` + MaxVMem float64 `json:"maxvmem"` + MaxRSS float64 `json:"maxrss"` } diff --git a/pkg/qstat/v9.0/parser.go b/pkg/qstat/v9.0/parser.go index 4ec3207..cdb30bc 100644 --- a/pkg/qstat/v9.0/parser.go +++ b/pkg/qstat/v9.0/parser.go @@ -60,6 +60,11 @@ func ParseGroupByTask(input string) ([]ParallelJobTask, error) { func parseFixedWidthJobs(input string) ([]ParallelJobTask, error) { var tasks []ParallelJobTask + input = strings.TrimSpace(input) + if input == "" { + return tasks, nil + } + // Correct column positions based on your description columnPositions := []struct { start int diff --git a/pkg/qstat/v9.0/parser_test.go b/pkg/qstat/v9.0/parser_test.go new file mode 100644 index 0000000..093ce7a --- /dev/null +++ b/pkg/qstat/v9.0/parser_test.go @@ -0,0 +1,184 @@ +package qstat_test + +import ( + qstat "github.com/hpc-gridware/go-clusterscheduler/pkg/qstat/v9.0" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Parser", func() { + + Context("ParseGroupByTask", func() { + + It("should parse the output of qstat -g t", func() { + input := `job-ID prior name user state submit/start at queue master ja-task-ID +------------------------------------------------------------------------------------------------------------------ + 14 0.50500 sleep root r 2024-10-28 07:21:41 all.q@master MASTER + 15 0.50500 sleep root r 2024-10-28 07:26:14 all.q@master MASTER 1 + 15 0.50500 sleep root r 2024-10-28 07:26:14 all.q@master MASTER 3 + 15 0.50500 sleep root r 2024-10-28 07:26:14 all.q@master MASTER 5 + 17 0.60500 sleep root qw 2024-10-28 07:27:50 + 12 0.50500 sleep root qw 2024-10-28 07:17:34 + 15 0.50500 sleep root qw 2024-10-28 07:26:14 7-99:2` + jobs, err := qstat.ParseGroupByTask(input) + Expect(err).NotTo(HaveOccurred()) + Expect(len(jobs)).To(Equal(7)) + Expect(jobs[0].JobID).To(Equal(14)) + Expect(jobs[0].Name).To(Equal("sleep")) + Expect(jobs[0].User).To(Equal("root")) + Expect(jobs[0].State).To(Equal("r")) + Expect(jobs[0].SubmitStartAt).To(Equal("2024-10-28 07:21:41")) + Expect(jobs[0].Queue).To(Equal("all.q@master")) + Expect(jobs[0].Master).To(Equal("MASTER")) + Expect(jobs[0].TaskID).To(Equal("")) + Expect(jobs[1].JobID).To(Equal(15)) + Expect(jobs[1].Name).To(Equal("sleep")) + Expect(jobs[1].User).To(Equal("root")) + Expect(jobs[1].State).To(Equal("r")) + Expect(jobs[1].SubmitStartAt).To(Equal("2024-10-28 07:26:14")) + Expect(jobs[1].TaskID).To(Equal("1")) + Expect(jobs[2].JobID).To(Equal(15)) + Expect(jobs[2].Name).To(Equal("sleep")) + Expect(jobs[2].User).To(Equal("root")) + Expect(jobs[2].State).To(Equal("r")) + Expect(jobs[2].SubmitStartAt).To(Equal("2024-10-28 07:26:14")) + Expect(jobs[2].TaskID).To(Equal("3")) + Expect(jobs[3].JobID).To(Equal(15)) + Expect(jobs[3].Name).To(Equal("sleep")) + Expect(jobs[3].User).To(Equal("root")) + Expect(jobs[3].State).To(Equal("r")) + Expect(jobs[3].SubmitStartAt).To(Equal("2024-10-28 07:26:14")) + Expect(jobs[3].TaskID).To(Equal("5")) + Expect(jobs[4].JobID).To(Equal(17)) + Expect(jobs[4].Name).To(Equal("sleep")) + Expect(jobs[4].User).To(Equal("root")) + Expect(jobs[4].State).To(Equal("qw")) + Expect(jobs[4].SubmitStartAt).To(Equal("2024-10-28 07:27:50")) + Expect(jobs[4].TaskID).To(Equal("")) + Expect(jobs[5].JobID).To(Equal(12)) + Expect(jobs[5].Name).To(Equal("sleep")) + Expect(jobs[5].User).To(Equal("root")) + Expect(jobs[5].State).To(Equal("qw")) + Expect(jobs[5].SubmitStartAt).To(Equal("2024-10-28 07:17:34")) + Expect(jobs[5].TaskID).To(Equal("")) + }) + + It("should parse the output of qstat -g t", func() { + + output := `job-ID prior name user state submit/start at queue master ja-task-ID +------------------------------------------------------------------------------------------------------------------ + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@master SLAVE + all.q@master SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@master MASTER 1 + all.q@master SLAVE 1 + all.q@master SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@master SLAVE 2 + all.q@master SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim1 SLAVE + all.q@sim1 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim1 SLAVE 1 + all.q@sim1 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim1 SLAVE 2 + all.q@sim1 SLAVE 2 + all.q@sim1 SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim10 SLAVE + all.q@sim10 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim10 SLAVE 1 + all.q@sim10 SLAVE 1 + all.q@sim10 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim10 SLAVE 2 + all.q@sim10 SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim11 SLAVE + all.q@sim11 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim11 SLAVE 1 + all.q@sim11 SLAVE 1 + all.q@sim11 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim11 SLAVE 2 + all.q@sim11 SLAVE 2 + 18 0.50500 sleep root r 2024-10-28 08:33:57 all.q@sim12 MASTER + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim12 SLAVE + all.q@sim12 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim12 SLAVE 1 + all.q@sim12 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim12 SLAVE 2 + all.q@sim12 SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim2 SLAVE + all.q@sim2 SLAVE + all.q@sim2 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim2 SLAVE 1 + all.q@sim2 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim2 SLAVE 2 + all.q@sim2 SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim3 MASTER + all.q@sim3 SLAVE + all.q@sim3 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim3 SLAVE 1 + all.q@sim3 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim3 SLAVE 2 + all.q@sim3 SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim4 SLAVE + all.q@sim4 SLAVE + all.q@sim4 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim4 SLAVE 1 + all.q@sim4 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim4 SLAVE 2 + all.q@sim4 SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim5 SLAVE + all.q@sim5 SLAVE + all.q@sim5 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim5 SLAVE 1 + all.q@sim5 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim5 SLAVE 2 + all.q@sim5 SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim6 SLAVE + all.q@sim6 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim6 SLAVE 1 + all.q@sim6 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim6 SLAVE 2 + all.q@sim6 SLAVE 2 + all.q@sim6 SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim7 SLAVE + all.q@sim7 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim7 SLAVE 1 + all.q@sim7 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim7 MASTER 2 + all.q@sim7 SLAVE 2 + all.q@sim7 SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim8 SLAVE + all.q@sim8 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim8 SLAVE 1 + all.q@sim8 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim8 SLAVE 2 + all.q@sim8 SLAVE 2 + all.q@sim8 SLAVE 2 + 19 0.60500 sleep root r 2024-10-28 08:34:25 all.q@sim9 SLAVE + all.q@sim9 SLAVE + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim9 SLAVE 1 + all.q@sim9 SLAVE 1 + all.q@sim9 SLAVE 1 + 20 0.60500 sleep root r 2024-10-28 08:34:36 all.q@sim9 SLAVE 2 + all.q@sim9 SLAVE 2 + 12 0.50500 sleep root qw 2024-10-28 07:17:34` + + jobs, err := qstat.ParseGroupByTask(output) + Expect(err).NotTo(HaveOccurred()) + Expect(len(jobs)).To(Equal(41)) + + // last job + Expect(jobs[40].JobID).To(Equal(12)) + Expect(jobs[40].Name).To(Equal("sleep")) + Expect(jobs[40].User).To(Equal("root")) + Expect(jobs[40].State).To(Equal("qw")) + Expect(jobs[40].SubmitStartAt).To(Equal("2024-10-28 07:17:34")) + + // job before last + Expect(jobs[39].JobID).To(Equal(20)) + Expect(jobs[39].TaskID).To(Equal("2")) + Expect(jobs[39].Queue).To(Equal("all.q@sim9")) + Expect(jobs[39].Master).To(Equal("SLAVE")) + Expect(jobs[39].SubmitStartAt).To(Equal("2024-10-28 08:34:36")) + }) + + }) + +}) diff --git a/pkg/qstat/v9.0/qstat_impl.go b/pkg/qstat/v9.0/qstat_impl.go index 230e6c3..3a98eb5 100644 --- a/pkg/qstat/v9.0/qstat_impl.go +++ b/pkg/qstat/v9.0/qstat_impl.go @@ -128,6 +128,14 @@ func (q *QStatImpl) NativeSpecification(args []string) (string, error) { command := exec.Command(q.config.Executable, args...) out, err := command.Output() if err != nil { + // convert error in exit error + ee, ok := err.(*exec.ExitError) + if ok { + if !ee.Success() { + return "", fmt.Errorf("qstat command failed with exit code %d", ee.ExitCode()) + } + return "", nil + } return "", fmt.Errorf("failed to get output of qstat: %w", err) } return string(out), nil From ea1413745da3042d6e3b94e743f031f7980fc6cc Mon Sep 17 00:00:00 2001 From: Daniel Gruber Date: Tue, 31 Dec 2024 08:12:38 +0100 Subject: [PATCH 2/4] BF: Fixing go.mod file for simulator --- Dockerfile | 2 +- Makefile | 8 ++++++-- cmd/simulator/go.mod | 4 +++- cmd/simulator/go.sum | 30 ++++++++++++++---------------- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/Dockerfile b/Dockerfile index a03ad48..9382940 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,7 +27,7 @@ COPY autoinstall.template /opt/helpers/ COPY installer.sh /opt/helpers/ COPY entrypoint.sh /entrypoint.sh -ARG GOLANG_VERSION=1.22.4 +ARG GOLANG_VERSION=1.23.1 RUN apt-get update && \ apt-get install -y curl wget git gcc make vim libhwloc-dev hwloc software-properties-common && \ diff --git a/Makefile b/Makefile index 1013648..9459924 100644 --- a/Makefile +++ b/Makefile @@ -34,13 +34,14 @@ build: # to run apptainers in containers. .PHONY: run-privileged run-privileged: build - @echo "Running the container..." + @echo "Running the Open Cluster Scheduler container in privileged mode..." mkdir -p ./installation docker run -p 7070:7070 --rm -it -h master --privileged -v /dev/fuse:/dev/fuse --cap-add SYS_ADMIN --name $(CONTAINER_NAME) -v ./installation:/opt/cs-install -v ./:/root/go/src/github.com/hpc-gridware/go-clusterscheduler $(IMAGE_NAME):$(IMAGE_TAG) /bin/bash .PHONY: run run: build - @echo "Running the container..." + @echo "Running the Open Cluster Scheduler container..." + @echo "For a new installation, you need to remove the ./installation subdirectory first." mkdir -p ./installation docker run -p 7070:7070 -p 9464:9464 --rm -it -h master --name $(CONTAINER_NAME) -v ./installation:/opt/cs-install -v ./:/root/go/src/github.com/hpc-gridware/go-clusterscheduler $(IMAGE_NAME):$(IMAGE_TAG) /bin/bash @@ -50,6 +51,9 @@ run: build .PHONY: simulate simulate: @echo "Running the container in simulation mode using cluster.json" + @echo "Removing subdirectory with old installation..." + rm -rf ./installation + @echo "Creating new subdirectory for installation..." mkdir -p ./installation docker run --rm -it -h master --privileged --cap-add SYS_ADMIN -p 9464:9464 --name $(CONTAINER_NAME) -v ./installation:/opt/cs-install -v ./:/root/go/src/github.com/hpc-gridware/go-clusterscheduler $(IMAGE_NAME):$(IMAGE_TAG) /bin/bash -c "cd /root/go/src/github.com/hpc-gridware/go-clusterscheduler/cmd/simulator && go build . && ./simulator run ../../cluster.json && /bin/bash" diff --git a/cmd/simulator/go.mod b/cmd/simulator/go.mod index 0039be6..5dae783 100644 --- a/cmd/simulator/go.mod +++ b/cmd/simulator/go.mod @@ -1,6 +1,8 @@ module github.com/hpc-gridware/go-clusterscheduler/cmd/simulator -go 1.22.5 +go 1.23.1 + +toolchain go1.23.4 replace github.com/hpc-gridware/go-clusterscheduler => ../.. diff --git a/cmd/simulator/go.sum b/cmd/simulator/go.sum index d04547d..f8fb7ce 100644 --- a/cmd/simulator/go.sum +++ b/cmd/simulator/go.sum @@ -5,29 +5,27 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= -github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo= +github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/onsi/ginkgo/v2 v2.19.1 h1:QXgq3Z8Crl5EL1WBAC98A5sEBHARrAJNzAmMxzLcRF0= -github.com/onsi/ginkgo/v2 v2.19.1/go.mod h1:O3DtEWQkPa/F7fBMgmZQKKsluAy8pd3rEQdrjkPb9zA= -github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k= -github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY= +github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg= +github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= +github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= -golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= -golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= -golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= +golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 4298c05c21362836ab13877e4c7a3dd9480cd28b Mon Sep 17 00:00:00 2001 From: Daniel Gruber Date: Mon, 10 Feb 2025 20:29:45 +0100 Subject: [PATCH 3/4] Add qstat -g d --- pkg/qacct/v9.0/file_test.go | 23 ++++---- pkg/qstat/v9.0/parser.go | 108 ++++++++++++++++++++++++++++++++++ pkg/qstat/v9.0/parser_test.go | 43 ++++++++++++++ pkg/qstat/v9.0/qstat.go | 2 + pkg/qstat/v9.0/qstat_impl.go | 12 +++- 5 files changed, 176 insertions(+), 12 deletions(-) diff --git a/pkg/qacct/v9.0/file_test.go b/pkg/qacct/v9.0/file_test.go index d223804..a0464ae 100644 --- a/pkg/qacct/v9.0/file_test.go +++ b/pkg/qacct/v9.0/file_test.go @@ -25,26 +25,27 @@ var _ = Describe("File", func() { It("returns a channel that emits JobDetail objects for 10 jobs", func() { + jobDetailsChan, err := qacct.WatchFile(context.Background(), + qacct.GetDefaultQacctFile(), 0) + Expect(err).NotTo(HaveOccurred()) + Expect(jobDetailsChan).NotTo(BeNil()) + qs, err := qsub.NewCommandLineQSub(qsub.CommandLineQSubConfig{}) Expect(err).NotTo(HaveOccurred()) jobIDs := make([]int, 10) for i := 0; i < 10; i++ { - jobID, _, err := qs.Submit(context.Background(), qsub.JobOptions{ - Command: "echo", - CommandArgs: []string{fmt.Sprintf("job %d", i+1)}, - Binary: qsub.ToPtr(true), - }) + jobID, _, err := qs.Submit(context.Background(), + qsub.JobOptions{ + Command: "/bin/bash", + CommandArgs: []string{"-c", fmt.Sprintf("echo job %d; sleep 0", i+1)}, + Binary: qsub.ToPtr(true), + }) Expect(err).NotTo(HaveOccurred()) log.Printf("jobID: %d", jobID) jobIDs[i] = int(jobID) } - jobDetailsChan, err := qacct.WatchFile(context.Background(), - qacct.GetDefaultQacctFile(), 0) - Expect(err).NotTo(HaveOccurred()) - Expect(jobDetailsChan).NotTo(BeNil()) - receivedJobs := make(map[int]bool) Eventually(func() bool { select { @@ -52,7 +53,7 @@ var _ = Describe("File", func() { log.Printf("job: %+v", jd.JobNumber) // check if jobID is in the jobIDs list if slices.Contains(jobIDs, int(jd.JobNumber)) { - Expect(jd.SubmitCommandLine).To(ContainSubstring("echo 'job")) + Expect(jd.SubmitCommandLine).To(ContainSubstring("bash")) Expect(jd.JobUsage.Usage.Memory).To(BeNumerically(">=", 0)) receivedJobs[int(jd.JobNumber)] = true } diff --git a/pkg/qstat/v9.0/parser.go b/pkg/qstat/v9.0/parser.go index 6e40623..e722399 100644 --- a/pkg/qstat/v9.0/parser.go +++ b/pkg/qstat/v9.0/parser.go @@ -830,3 +830,111 @@ func ParseClusterQueueSummary(out string) ([]ClusterQueueSummary, error) { return summaries, nil } + +/* +qstat -g d +job-ID prior name user state submit/start at queue slots ja-task-ID +----------------------------------------------------------------------------------------------------------------- + + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 1 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 3 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 5 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 7 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 25 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 27 + 36 0.60500 sleep root qw 2025-02-10 16:52:21 2 + 37 0.60500 sleep root qw 2025-02-10 16:52:35 2 + 38 0.60500 sleep root qw 2025-02-10 16:52:49 2 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 1 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 2 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 3 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 8 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 9 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 10 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 29 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 31 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 99 + 34 0.50500 sleep root qw 2025-02-10 16:51:51 1 +*/ +func ParseJobArrayTask(out string) ([]JobArrayTask, error) { + lines := strings.Split(out, "\n") + + jobArrayTasks := make([]JobArrayTask, 0, len(lines)-3) + + for _, line := range lines[2:] { + fields := strings.Fields(line) + if len(fields) < 8 { + continue + } + jobID, err := strconv.Atoi(fields[0]) + if err != nil { + return nil, fmt.Errorf("failed to parse jobID: %v", err) + } + priority, err := strconv.ParseFloat(fields[1], 64) + if err != nil { + return nil, fmt.Errorf("failed to parse priority: %v", err) + } + name := fields[2] + user := fields[3] + state := fields[4] + timeString := fields[5] + " " + fields[6] + jobTime, err := time.Parse("2006-01-02 15:04:05", timeString) + if err != nil { + return nil, fmt.Errorf("failed to parse submit time: %v", err) + } + var submitTime time.Time + var startTime time.Time + if strings.Contains(state, "qw") { + startTime = jobTime + } else { + submitTime = jobTime + } + + // if fields[7] is not a number, it is the queue name + var slots int + var taskID int + var queue string + + // when waiting there is no queue name + if slotsInt, err := strconv.Atoi(fields[7]); err != nil { + queue = fields[7] + if len(fields) > 8 { + slots, _ = strconv.Atoi(fields[8]) + } + if len(fields) > 9 { + taskID, _ = strconv.Atoi(fields[9]) + } + } else { + slots = slotsInt + // waiting jobs + if len(fields) > 8 { + slots, _ = strconv.Atoi(fields[8]) + } + if len(fields) > 9 { + taskID, err = strconv.Atoi(fields[9]) + if err != nil { + // a single job and parallel job has no taskID + taskID = 0 + } + } + } + + jobInfo := JobInfo{ + JobID: jobID, + Priority: priority, + Name: name, + User: user, + State: state, + SubmitTime: submitTime, + StartTime: startTime, + Queue: queue, + Slots: slots, + JaTaskIDs: []int64{int64(taskID)}, + } + jobArrayTasks = append(jobArrayTasks, JobArrayTask{ + JobInfo: jobInfo, + }) + + } + return jobArrayTasks, nil +} diff --git a/pkg/qstat/v9.0/parser_test.go b/pkg/qstat/v9.0/parser_test.go index b45f211..d1da3fa 100644 --- a/pkg/qstat/v9.0/parser_test.go +++ b/pkg/qstat/v9.0/parser_test.go @@ -452,4 +452,47 @@ test.q 0.08 0 0 2 2 0 0 }) + Describe("JobArrayTask", func() { + + It("should parse the output of qstat -g d", func() { + input := `job-ID prior name user state submit/start at queue slots ja-task-ID +----------------------------------------------------------------------------------------------------------------- + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 1 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 3 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 5 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 23 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 25 + 33 0.50500 sleep root r 2025-02-10 16:47:18 all.q@master 1 27 + 36 0.60500 sleep root qw 2025-02-10 16:52:21 2 + 37 0.60500 sleep root qw 2025-02-10 16:52:35 2 + 38 0.60500 sleep root qw 2025-02-10 16:52:49 2 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 1 + 39 0.60500 sleep root qw 2025-02-10 16:53:23 2 2 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 95 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 97 + 33 0.50500 sleep root qw 2025-02-10 16:47:18 1 99 + 34 0.50500 sleep root qw 2025-02-10 16:51:51 1 +` + jobArrayTasks, err := qstat.ParseJobArrayTask(input) + Expect(err).NotTo(HaveOccurred()) + Expect(len(jobArrayTasks)).To(Equal(15)) + + Expect(jobArrayTasks).To(ContainElement(qstat.JobArrayTask{ + JobInfo: qstat.JobInfo{ + JobID: 33, + Priority: 0.505, + Name: "sleep", + User: "root", + State: "r", + SubmitTime: time.Date(2025, 2, 10, 16, 47, 18, 0, time.UTC), + StartTime: time.Time{}, + Queue: "all.q@master", + Slots: 1, + JaTaskIDs: []int64{1}, + }, + })) + }) + + }) + }) diff --git a/pkg/qstat/v9.0/qstat.go b/pkg/qstat/v9.0/qstat.go index 09c569f..4a97977 100644 --- a/pkg/qstat/v9.0/qstat.go +++ b/pkg/qstat/v9.0/qstat.go @@ -45,7 +45,9 @@ type QStat interface { ShowFullOutputWithResources(resourceAttributes string) ([]JobInfo, error) // qstat -g c DisplayClusterQueueSummary() ([]ClusterQueueSummary, error) + // qstat -g d shows all job array tasks individually DisplayAllJobArrayTasks() ([]JobArrayTask, error) + // qstat -g p shows all parallel job tasks individually DisplayAllParallelJobTasks() ([]ParallelJobTask, error) // qstat -help ShowHelp() (string, error) diff --git a/pkg/qstat/v9.0/qstat_impl.go b/pkg/qstat/v9.0/qstat_impl.go index 983b713..f1146e8 100644 --- a/pkg/qstat/v9.0/qstat_impl.go +++ b/pkg/qstat/v9.0/qstat_impl.go @@ -178,8 +178,18 @@ func (q *QStatImpl) DisplayClusterQueueSummary() ([]ClusterQueueSummary, error) return ParseClusterQueueSummary(out) } +// DisplayAllJobArrayTasks is equivalent to "qstat -g d" func (q *QStatImpl) DisplayAllJobArrayTasks() ([]JobArrayTask, error) { - return nil, fmt.Errorf("not implemented") + out, err := q.NativeSpecification([]string{"-g", "d"}) + if err != nil { + return nil, fmt.Errorf("failed to get output of qstat: %w", err) + } + jobArrayTasks, err := ParseJobArrayTask(out) + if err != nil { + return nil, fmt.Errorf("failed to parse job array tasks: %w", err) + } + + return jobArrayTasks, nil } func (q *QStatImpl) DisplayAllParallelJobTasks() ([]ParallelJobTask, error) { From 468ab1e5889b5b778b9d667093aa83ca62581bf8 Mon Sep 17 00:00:00 2001 From: Daniel Gruber Date: Tue, 11 Feb 2025 08:49:56 +0100 Subject: [PATCH 4/4] qstat -g d parser fix --- Dockerfile | 2 +- Makefile | 2 +- pkg/qstat/v9.0/parser.go | 8 ++++++-- pkg/qstat/v9.0/parser_test.go | 27 +++++++++++++++++++++++++-- 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2320a7b..9004728 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,7 +29,7 @@ COPY autoinstall.template /opt/helpers/ COPY installer.sh /opt/helpers/ COPY entrypoint.sh /entrypoint.sh -ARG GOLANG_VERSION=1.23.5 +ARG GOLANG_VERSION=1.23.6 RUN apt-get update && \ apt-get install -y curl wget git gcc make vim libhwloc-dev hwloc software-properties-common man-db && \ diff --git a/Makefile b/Makefile index fe07999..9be0022 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ # This Makefile is used for development and testing purposes only. IMAGE_NAME = $(shell basename $(CURDIR)) -IMAGE_TAG = V901_TAG +IMAGE_TAG = V902_TAG CONTAINER_NAME = $(IMAGE_NAME) .PHONY: build diff --git a/pkg/qstat/v9.0/parser.go b/pkg/qstat/v9.0/parser.go index e722399..e356e4a 100644 --- a/pkg/qstat/v9.0/parser.go +++ b/pkg/qstat/v9.0/parser.go @@ -859,7 +859,11 @@ job-ID prior name user state submit/start at queue func ParseJobArrayTask(out string) ([]JobArrayTask, error) { lines := strings.Split(out, "\n") - jobArrayTasks := make([]JobArrayTask, 0, len(lines)-3) + jobArrayTasks := make([]JobArrayTask, 0, len(lines)) + + if len(lines) < 2 { + return jobArrayTasks, nil + } for _, line := range lines[2:] { fields := strings.Fields(line) @@ -884,7 +888,7 @@ func ParseJobArrayTask(out string) ([]JobArrayTask, error) { } var submitTime time.Time var startTime time.Time - if strings.Contains(state, "qw") { + if !strings.Contains(state, "qw") { startTime = jobTime } else { submitTime = jobTime diff --git a/pkg/qstat/v9.0/parser_test.go b/pkg/qstat/v9.0/parser_test.go index d1da3fa..797b1d9 100644 --- a/pkg/qstat/v9.0/parser_test.go +++ b/pkg/qstat/v9.0/parser_test.go @@ -484,13 +484,36 @@ test.q 0.08 0 0 2 2 0 0 Name: "sleep", User: "root", State: "r", - SubmitTime: time.Date(2025, 2, 10, 16, 47, 18, 0, time.UTC), - StartTime: time.Time{}, + StartTime: time.Date(2025, 2, 10, 16, 47, 18, 0, time.UTC), + SubmitTime: time.Time{}, Queue: "all.q@master", Slots: 1, JaTaskIDs: []int64{1}, }, })) + + Expect(jobArrayTasks).To(ContainElement(qstat.JobArrayTask{ + JobInfo: qstat.JobInfo{ + JobID: 36, + Priority: 0.605, + Name: "sleep", + User: "root", + State: "qw", + SubmitTime: time.Date(2025, 2, 10, 16, 52, 21, 0, time.UTC), + StartTime: time.Time{}, + Queue: "", + Slots: 2, + JaTaskIDs: []int64{0}, + }, + })) + + }) + + It("should parse an empty input", func() { + input := "" + jobArrayTasks, err := qstat.ParseJobArrayTask(input) + Expect(err).NotTo(HaveOccurred()) + Expect(len(jobArrayTasks)).To(Equal(0)) }) })