diff --git a/cmd.go b/cmd.go new file mode 100644 index 0000000..a0eac22 --- /dev/null +++ b/cmd.go @@ -0,0 +1,129 @@ +package cronsun + +import ( + "time" + + cron "github.com/robfig/cron/v3" + "github.com/shunfei/cronsun/conf" + "github.com/shunfei/cronsun/log" +) + +type Cmd struct { + EntryID cron.EntryID + *Job + *JobRule +} + +func (c *Cmd) GetID() cron.EntryID { + return c.EntryID +} + +func (c *Cmd) Run() { + log.Debugf("c:%+v", c) + // 同时执行任务数限制 + if c.Job.limit() { + return + } + defer c.Job.unlimit() + + if c.Job.Kind != KindCommon { + lk := c.lock() + if lk == nil { + return + } + defer lk.unlock() + } + + if c.Job.Retry <= 0 { + c.Job.Run() + return + } + + for i := 0; i <= c.Job.Retry; i++ { + if c.Job.Run() { + return + } + + if c.Job.Interval > 0 { + time.Sleep(time.Duration(c.Job.Interval) * time.Second) + } + } +} +func (c *Cmd) lockTtl() int64 { + now := time.Now() + prev := c.JobRule.Schedule.Next(now) + ttl := int64(c.JobRule.Schedule.Next(prev).Sub(prev) / time.Second) + if ttl == 0 { + return 0 + } + + if c.Job.Kind == KindInterval { + ttl -= 2 + if ttl > conf.Config.LockTtl { + ttl = conf.Config.LockTtl + } + if ttl < 1 { + ttl = 1 + } + return ttl + } + + cost := c.Job.AvgTime / 1e3 + if c.Job.AvgTime/1e3-cost*1e3 > 0 { + cost += 1 + } + // 如果执行间隔时间不大于执行时间,把过期时间设置为执行时间的下限-1 + // 以便下次执行的时候,能获取到 lock + if ttl >= cost { + ttl -= cost + } + + if ttl > conf.Config.LockTtl { + ttl = conf.Config.LockTtl + } + + // 支持的最小时间间隔 2s + if ttl < 2 { + ttl = 2 + } + + return ttl +} + +func (c *Cmd) newLock() *locker { + return &locker{ + kind: c.Job.Kind, + ttl: c.lockTtl(), + done: make(chan struct{}), + } +} + +func (c *Cmd) lock() *locker { + lk := c.newLock() + // 非法的 rule + if lk.ttl == 0 { + return nil + } + + resp, err := DefalutClient.Grant(lk.ttl) + if err != nil { + log.Infof("job[%s] didn't get a lock, err: %s", c.Job.Key(), err.Error()) + return nil + } + + ok, err := DefalutClient.GetLock(c.Job.ID, resp.ID) + if err != nil { + log.Infof("job[%s] didn't get a lock, err: %s", c.Job.Key(), err.Error()) + return nil + } + + if !ok { + return nil + } + + lk.lID = resp.ID + if lk.kind == KindAlone { + go lk.keepAlive() + } + return lk +} diff --git a/job.go b/job.go index 463dd9a..a4eebe5 100644 --- a/job.go +++ b/job.go @@ -77,95 +77,6 @@ type Job struct { Count *int64 `json:"-"` } -type JobRule struct { - ID string `json:"id"` - Timer string `json:"timer"` - GroupIDs []string `json:"gids"` - NodeIDs []string `json:"nids"` - ExcludeNodeIDs []string `json:"exclude_nids"` - - Schedule cron.Schedule `json:"-"` -} - -// 任务锁 -type locker struct { - kind int - ttl int64 - lID client.LeaseID - timer *time.Timer - done chan struct{} -} - -func (l *locker) keepAlive() { - duration := time.Duration(l.ttl)*time.Second - 500*time.Millisecond - l.timer = time.NewTimer(duration) - for { - select { - case <-l.done: - return - case <-l.timer.C: - _, err := DefalutClient.KeepAliveOnce(l.lID) - if err != nil { - log.Warnf("lock keep alive err: %s", err.Error()) - return - } - l.timer.Reset(duration) - } - } -} - -func (l *locker) unlock() { - if l.kind != KindAlone { - return - } - - close(l.done) - l.timer.Stop() - if _, err := DefalutClient.Revoke(l.lID); err != nil { - log.Warnf("unlock revoke err: %s", err.Error()) - } -} - -type Cmd struct { - *Job - *JobRule -} - -func (c *Cmd) GetID() string { - return c.Job.ID + c.JobRule.ID -} - -func (c *Cmd) Run() { - // 同时执行任务数限制 - if c.Job.limit() { - return - } - defer c.Job.unlimit() - - if c.Job.Kind != KindCommon { - lk := c.lock() - if lk == nil { - return - } - defer lk.unlock() - } - - if c.Job.Retry <= 0 { - c.Job.Run() - return - } - - for i := 0; i <= c.Job.Retry; i++ { - if c.Job.Run() { - return - } - - if c.Job.Interval > 0 { - time.Sleep(time.Duration(c.Job.Interval) * time.Second) - } - } -} - func (j *Job) limit() bool { if j.Parallels == 0 { return false @@ -194,203 +105,6 @@ func (j *Job) Init(nodeID, hostname, ip string) { var c int64 j.Count, j.runOn, j.hostname, j.ip = &c, nodeID, hostname, ip } - -func (c *Cmd) lockTtl() int64 { - now := time.Now() - prev := c.JobRule.Schedule.Next(now) - ttl := int64(c.JobRule.Schedule.Next(prev).Sub(prev) / time.Second) - if ttl == 0 { - return 0 - } - - if c.Job.Kind == KindInterval { - ttl -= 2 - if ttl > conf.Config.LockTtl { - ttl = conf.Config.LockTtl - } - if ttl < 1 { - ttl = 1 - } - return ttl - } - - cost := c.Job.AvgTime / 1e3 - if c.Job.AvgTime/1e3-cost*1e3 > 0 { - cost += 1 - } - // 如果执行间隔时间不大于执行时间,把过期时间设置为执行时间的下限-1 - // 以便下次执行的时候,能获取到 lock - if ttl >= cost { - ttl -= cost - } - - if ttl > conf.Config.LockTtl { - ttl = conf.Config.LockTtl - } - - // 支持的最小时间间隔 2s - if ttl < 2 { - ttl = 2 - } - - return ttl -} - -func (c *Cmd) newLock() *locker { - return &locker{ - kind: c.Job.Kind, - ttl: c.lockTtl(), - done: make(chan struct{}), - } -} - -func (c *Cmd) lock() *locker { - lk := c.newLock() - // 非法的 rule - if lk.ttl == 0 { - return nil - } - - resp, err := DefalutClient.Grant(lk.ttl) - if err != nil { - log.Infof("job[%s] didn't get a lock, err: %s", c.Job.Key(), err.Error()) - return nil - } - - ok, err := DefalutClient.GetLock(c.Job.ID, resp.ID) - if err != nil { - log.Infof("job[%s] didn't get a lock, err: %s", c.Job.Key(), err.Error()) - return nil - } - - if !ok { - return nil - } - - lk.lID = resp.ID - if lk.kind == KindAlone { - go lk.keepAlive() - } - return lk -} - -// 优先取结点里的值,更新 group 时可用 gid 判断是否对 job 进行处理 -func (rule *JobRule) included(nid string, gs map[string]*Group) bool { - for i, count := 0, len(rule.NodeIDs); i < count; i++ { - if nid == rule.NodeIDs[i] { - return true - } - } - - for _, gid := range rule.GroupIDs { - if g, ok := gs[gid]; ok && g.Included(nid) { - return true - } - } - - return false -} - -// 验证 timer 字段 -func (rule *JobRule) Valid() error { - // 注意 interface nil 的比较 - if rule.Schedule != nil { - return nil - } - - if len(rule.Timer) == 0 { - return ErrNilRule - } - - sch, err := cron.ParseStandard(rule.Timer) - if err != nil { - return fmt.Errorf("invalid JobRule[%s], parse err: %s", rule.Timer, err.Error()) - } - - rule.Schedule = sch - return nil -} - -// Note: this function did't check the job. -func GetJob(group, id string) (job *Job, err error) { - job, _, err = GetJobAndRev(group, id) - return -} - -func GetJobAndRev(group, id string) (job *Job, rev int64, err error) { - resp, err := DefalutClient.Get(JobKey(group, id)) - if err != nil { - return - } - - if resp.Count == 0 { - err = ErrNotFound - return - } - - rev = resp.Kvs[0].ModRevision - if err = json.Unmarshal(resp.Kvs[0].Value, &job); err != nil { - return - } - - job.splitCmd() - return -} - -func DeleteJob(group, id string) (resp *client.DeleteResponse, err error) { - return DefalutClient.Delete(JobKey(group, id)) -} - -// 获取所有任务 -func GetJobs() (jobs map[string]*Job, err error) { - log.Debugf("") - resp, err := DefalutClient.Get(conf.Config.Cmd, client.WithPrefix()) - if err != nil { - log.Errorf("DefalutClient.Get() failed, err:%s", err) - return jobs, err - } - - count := len(resp.Kvs) - jobs = make(map[string]*Job, count) - if count == 0 { - log.Warnf("job count is 0") - return jobs, err - } - - for _, j := range resp.Kvs { - job := new(Job) - if err := json.Unmarshal(j.Value, job); err != nil { - log.Warnf("job[%s] umarshal err: %s", string(j.Key), err) - continue - } - - if err := job.Valid(); err != nil { - log.Warnf("job[%s] is invalid: %s", string(j.Key), err) - continue - } - - job.alone() - jobs[job.ID] = job - } - return jobs, err -} - -func WatchJobs() client.WatchChan { - return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix()) -} - -func GetJobFromKv(key, value []byte) (job *Job, err error) { - job = new(Job) - if err = json.Unmarshal(value, job); err != nil { - err = fmt.Errorf("job[%s] umarshal err: %s", string(key), err.Error()) - return - } - - err = job.Valid() - job.alone() - return -} - func (j *Job) alone() { if j.Kind == KindAlone { j.Parallels = 1 @@ -505,20 +219,6 @@ func (j *Job) RunWithRecovery() { j.Run() } -// 从 etcd 的 key 中取 id -func GetIDFromKey(key string) string { - index := strings.LastIndex(key, "/") - if index < 0 { - return "" - } - - return key[index+1:] -} - -func JobKey(group, id string) string { - return conf.Config.Cmd + group + "/" + id -} - func (j *Job) Key() string { return JobKey(j.Group, j.ID) } @@ -617,9 +317,9 @@ func (j *Job) Avg(t, et time.Time) { } // -func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) { +func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[cron.EntryID]*Cmd) { log.Debugf("nid:%s, gs:%+v", nid, gs) - cmds = make(map[string]*Cmd) + cmds = make(map[cron.EntryID]*Cmd) if j.Pause { return cmds } @@ -770,3 +470,97 @@ func (j *Job) CreateCmdAttr() (*syscall.SysProcAttr, error) { return sysProcAttr, nil } + +// Note: this function did't check the job. +func GetJob(group, id string) (job *Job, err error) { + job, _, err = GetJobAndRev(group, id) + return +} + +func GetJobAndRev(group, id string) (job *Job, rev int64, err error) { + resp, err := DefalutClient.Get(JobKey(group, id)) + if err != nil { + return + } + + if resp.Count == 0 { + err = ErrNotFound + return + } + + rev = resp.Kvs[0].ModRevision + if err = json.Unmarshal(resp.Kvs[0].Value, &job); err != nil { + return + } + + job.splitCmd() + return +} + +func DeleteJob(group, id string) (resp *client.DeleteResponse, err error) { + return DefalutClient.Delete(JobKey(group, id)) +} + +// 获取所有任务 +func GetJobs() (jobs map[string]*Job, err error) { + log.Debugf("") + resp, err := DefalutClient.Get(conf.Config.Cmd, client.WithPrefix()) + if err != nil { + log.Errorf("DefalutClient.Get() failed, err:%s", err) + return jobs, err + } + + count := len(resp.Kvs) + jobs = make(map[string]*Job, count) + if count == 0 { + log.Warnf("job count is 0") + return jobs, err + } + + for _, j := range resp.Kvs { + job := new(Job) + if err := json.Unmarshal(j.Value, job); err != nil { + log.Warnf("job[%s] umarshal err: %s", string(j.Key), err) + continue + } + + if err := job.Valid(); err != nil { + log.Warnf("job[%s] is invalid: %s", string(j.Key), err) + continue + } + + job.alone() + jobs[job.ID] = job + } + return jobs, err +} + +func WatchJobs() client.WatchChan { + return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix()) +} + +func GetJobFromKv(key, value []byte) (job *Job, err error) { + job = new(Job) + if err = json.Unmarshal(value, job); err != nil { + err = fmt.Errorf("job[%s] umarshal err: %s", string(key), err.Error()) + return + } + + err = job.Valid() + job.alone() + return +} + +// 从 etcd 的 key 中取 id +func GetIDFromKey(key string) string { + index := strings.LastIndex(key, "/") + if index < 0 { + return "" + } + + return key[index+1:] +} + +func JobKey(group, id string) string { + return conf.Config.Cmd + group + "/" + id +} diff --git a/jobRule.go b/jobRule.go new file mode 100644 index 0000000..50e35e9 --- /dev/null +++ b/jobRule.go @@ -0,0 +1,54 @@ +package cronsun + +import ( + "fmt" + + cron "github.com/robfig/cron/v3" +) + +type JobRule struct { + ID string `json:"id"` + Timer string `json:"timer"` + GroupIDs []string `json:"gids"` + NodeIDs []string `json:"nids"` + ExcludeNodeIDs []string `json:"exclude_nids"` + + Schedule cron.Schedule `json:"-"` +} + +// 优先取结点里的值,更新 group 时可用 gid 判断是否对 job 进行处理 +func (rule *JobRule) included(nid string, gs map[string]*Group) bool { + for i, count := 0, len(rule.NodeIDs); i < count; i++ { + if nid == rule.NodeIDs[i] { + return true + } + } + + for _, gid := range rule.GroupIDs { + if g, ok := gs[gid]; ok && g.Included(nid) { + return true + } + } + + return false +} + +// 验证 timer 字段 +func (rule *JobRule) Valid() error { + // 注意 interface nil 的比较 + if rule.Schedule != nil { + return nil + } + + if len(rule.Timer) == 0 { + return ErrNilRule + } + + sch, err := cron.ParseStandard(rule.Timer) + if err != nil { + return fmt.Errorf("invalid JobRule[%s], parse err: %s", rule.Timer, err.Error()) + } + + rule.Schedule = sch + return nil +} diff --git a/locker.go b/locker.go new file mode 100644 index 0000000..d14d256 --- /dev/null +++ b/locker.go @@ -0,0 +1,47 @@ +package cronsun + +import ( + "time" + + client "github.com/coreos/etcd/clientv3" + "github.com/shunfei/cronsun/log" +) + +// 任务锁 +type locker struct { + kind int + ttl int64 + lID client.LeaseID + timer *time.Timer + done chan struct{} +} + +func (l *locker) keepAlive() { + duration := time.Duration(l.ttl)*time.Second - 500*time.Millisecond + l.timer = time.NewTimer(duration) + for { + select { + case <-l.done: + return + case <-l.timer.C: + _, err := DefalutClient.KeepAliveOnce(l.lID) + if err != nil { + log.Warnf("lock keep alive err: %s", err.Error()) + return + } + l.timer.Reset(duration) + } + } +} + +func (l *locker) unlock() { + if l.kind != KindAlone { + return + } + + close(l.done) + l.timer.Stop() + if _, err := DefalutClient.Revoke(l.lID); err != nil { + log.Warnf("unlock revoke err: %s", err.Error()) + } +} diff --git a/node/node.go b/node/node.go index 9c60cf7..24e67e1 100644 --- a/node/node.go +++ b/node/node.go @@ -27,7 +27,7 @@ type Node struct { jobs Jobs // 和结点相关的任务 groups Groups - cmds map[string]*cronsun.Cmd + cmds map[cron.EntryID]*cronsun.Cmd link // 删除的 job id,用于 group 更新 @@ -67,7 +67,7 @@ func NewNode(cfg *conf.Conf) (n *Node, err error) { Cron: cron.New(), jobs: make(Jobs, 8), - cmds: make(map[string]*cronsun.Cmd), + cmds: make(map[cron.EntryID]*cronsun.Cmd), link: newLink(8), delIDs: make(map[string]bool, 8), @@ -201,15 +201,15 @@ func (n *Node) loadJobs() (err error) { for _, job := range jobs { job.Init(n.ID, n.Hostname, n.IP) - n.addJob(job, false) + n.addJob(job) } return } // n.jobs和n.cmds的区别是什么? -func (n *Node) addJob(job *cronsun.Job, notice bool) error { - log.Debugf("job:%+v, notice:%t", job, notice) +func (n *Node) addJob(job *cronsun.Job) error { + log.Debugf("job:%+v", job) n.link.addJob(job) // 判断该job是否会在该节点运行 @@ -225,7 +225,7 @@ func (n *Node) addJob(job *cronsun.Job, notice bool) error { } for _, cmd := range cmds { - n.addCmd(cmd, notice) + n.addCmd(cmd) } return nil } @@ -257,7 +257,7 @@ func (n *Node) modJob(job *cronsun.Job) { oJob, ok := n.jobs[job.ID] // 之前此任务没有在当前结点执行,直接增加任务 if !ok { - n.addJob(job, true) + n.addJob(job) return } @@ -269,7 +269,7 @@ func (n *Node) modJob(job *cronsun.Job) { cmds := oJob.Cmds(n.ID, n.groups) for id, cmd := range cmds { - n.modCmd(cmd, true) + n.modCmd(cmd) delete(prevCmds, id) } @@ -281,22 +281,19 @@ func (n *Node) modJob(job *cronsun.Job) { } // 将任务添加到cron中 -func (n *Node) addCmd(cmd *cronsun.Cmd, notice bool) error { - log.Debugf("cmd:%+v, notice:%t", cmd, notice) - n.Cron.Schedule(cmd.JobRule.Schedule, cmd) - n.cmds[cmd.GetID()] = cmd - - if notice { - log.Infof("job[%s] group[%s] rule[%s] timer[%s] has added", cmd.Job.ID, cmd.Job.Group, cmd.JobRule.ID, cmd.JobRule.Timer) - } +func (n *Node) addCmd(cmd *cronsun.Cmd) error { + log.Debugf("cmd:%+v", cmd) + // 接受一个实现了Cmd interface的对象作为参数,实现run方法即可 + entryId := n.Cron.Schedule(cmd.JobRule.Schedule, cmd) + n.cmds[entryId] = cmd return nil } -func (n *Node) modCmd(cmd *cronsun.Cmd, notice bool) { - log.Debugf("cmd:%+v, notice:%t", cmd, notice) +func (n *Node) modCmd(cmd *cronsun.Cmd) { + log.Debugf("cmd:%+v", cmd) c, ok := n.cmds[cmd.GetID()] if !ok { - n.addCmd(cmd, notice) + n.addCmd(cmd) return } @@ -308,17 +305,12 @@ func (n *Node) modCmd(cmd *cronsun.Cmd, notice bool) { if c.JobRule.Timer != sch { n.Cron.Schedule(c.JobRule.Schedule, c) } - - if notice { - log.Infof("job[%s] group[%s] rule[%s] timer[%s] has updated", c.Job.ID, c.Job.Group, c.JobRule.ID, c.JobRule.Timer) - } } func (n *Node) delCmd(cmd *cronsun.Cmd) { log.Debugf("cmd:%+v", cmd) delete(n.cmds, cmd.GetID()) - n.Cron.DelJob(cmd) - log.Infof("job[%s] group[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.Job.Group, cmd.JobRule.ID, cmd.JobRule.Timer) + n.Cron.Remove(cmd.GetID()) } func (n *Node) addGroup(g *cronsun.Group) { @@ -416,7 +408,7 @@ func (n *Node) groupAddNode(g *cronsun.Group) { cmds := job.Cmds(n.ID, n.groups) for _, cmd := range cmds { - n.addCmd(cmd, true) + n.addCmd(cmd) } } return @@ -443,7 +435,7 @@ func (n *Node) groupRmNode(g, og *cronsun.Group) { cmds := job.Cmds(n.ID, n.groups) for id, cmd := range cmds { - n.addCmd(cmd, true) + n.addCmd(cmd) delete(prevCmds, id) } @@ -479,7 +471,7 @@ func (n *Node) watchJobs() { } job.Init(n.ID, n.Hostname, n.IP) - n.addJob(job, true) + n.addJob(job) case ev.IsModify(): job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value) if err != nil {