forked from libi/dcron
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdcron.go
More file actions
162 lines (139 loc) · 3.7 KB
/
Copy pathdcron.go
File metadata and controls
162 lines (139 loc) · 3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package dcron
import (
"errors"
"log"
"os"
"time"
"github.com/leyou240/dcron/driver"
"github.com/robfig/cron/v3"
)
const defaultReplicas = 50
const defaultDuration = time.Second
// Dcron is main struct
type Dcron struct {
jobs map[string]*JobWarpper
ServerName string
nodePool *NodePool
isRun bool
logger interface{ Printf(string, ...interface{}) }
nodeUpdateDuration time.Duration
hashReplicas int
cr *cron.Cron
crOptions []cron.Option
}
// NewDcron create a Dcron
func NewDcron(serverName string, driver driver.Driver, cronOpts ...cron.Option) *Dcron {
dcron := newDcron(serverName)
dcron.crOptions = cronOpts
dcron.cr = cron.New(cronOpts...)
dcron.nodePool = newNodePool(serverName, driver, dcron, dcron.nodeUpdateDuration, dcron.hashReplicas)
return dcron
}
// NewDcronWithOption create a Dcron with Dcron Option
func NewDcronWithOption(serverName string, driver driver.Driver, dcronOpts ...Option) *Dcron {
dcron := newDcron(serverName)
for _, opt := range dcronOpts {
opt(dcron)
}
dcron.cr = cron.New(dcron.crOptions...)
dcron.nodePool = newNodePool(serverName, driver, dcron, dcron.nodeUpdateDuration, dcron.hashReplicas)
return dcron
}
func newDcron(serverName string) *Dcron {
return &Dcron{
ServerName: serverName,
logger: log.New(os.Stdout, "[dcron] ", log.LstdFlags),
jobs: make(map[string]*JobWarpper),
crOptions: make([]cron.Option, 0),
nodeUpdateDuration: defaultDuration,
hashReplicas: defaultReplicas,
}
}
// SetLogger set dcron logger
func (d *Dcron) SetLogger(logger *log.Logger) {
d.logger = logger
}
// GetLogger get dcron logger
func (d *Dcron) GetLogger() interface{ Printf(string, ...interface{}) } {
return d.logger
}
func (d *Dcron) info(format string, v ...interface{}) {
d.logger.Printf("INFO: "+format, v...)
}
func (d *Dcron) err(format string, v ...interface{}) {
d.logger.Printf("ERR: "+format, v...)
}
// AddJob add a job
func (d *Dcron) AddJob(jobName, cronStr string, job Job) (err error) {
return d.addJob(jobName, cronStr, nil, job)
}
// AddFunc add a cron func
func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error) {
return d.addJob(jobName, cronStr, cmd, nil)
}
func (d *Dcron) addJob(jobName, cronStr string, cmd func(), job Job) (err error) {
d.info("addJob '%s' : %s", jobName, cronStr)
if _, ok := d.jobs[jobName]; ok {
return errors.New("jobName already exist")
}
innerJob := JobWarpper{
Name: jobName,
CronStr: cronStr,
Func: cmd,
Job: job,
Dcron: d,
}
entryID, err := d.cr.AddJob(cronStr, innerJob)
if err != nil {
return err
}
innerJob.ID = entryID
d.jobs[jobName] = &innerJob
return nil
}
// Remove Job
func (d *Dcron) Remove(jobName string) {
if job, ok := d.jobs[jobName]; ok {
delete(d.jobs, jobName)
d.cr.Remove(job.ID)
}
}
func (d *Dcron) allowThisNodeRun(jobName string) bool {
allowRunNode := d.nodePool.PickNodeByJobName(jobName)
d.info("job '%s' running in node %s", jobName, allowRunNode)
if allowRunNode == "" {
d.err("node pool is empty")
return false
}
return d.nodePool.NodeID == allowRunNode
}
// Start start job
func (d *Dcron) Start() {
d.isRun = true
err := d.nodePool.StartPool()
if err != nil {
d.isRun = false
d.err("dcron start node pool error %+v", err)
return
}
d.cr.Start()
d.info("dcron started , nodeID is %s", d.nodePool.NodeID)
}
// Run Job
func (d *Dcron) Run() {
d.isRun = true
err := d.nodePool.StartPool()
if err != nil {
d.isRun = false
d.err("dcron start node pool error %+v", err)
return
}
d.info("dcron running nodeID is %s", d.nodePool.NodeID)
d.cr.Run()
}
// Stop stop job
func (d *Dcron) Stop() {
d.isRun = false
d.cr.Stop()
d.info("dcron stopped")
}