forked from shunfei/cronsun
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
nicojwzhang
committed
Jul 27, 2021
1 parent
54b7e29
commit ff2288a
Showing
5 changed files
with
346 additions
and
330 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
package cronsun | ||
|
||
import ( | ||
"time" | ||
|
||
cron "github.com/robfig/cron/v3" | ||
"github.com/shunfei/cronsun/conf" | ||
"github.com/shunfei/cronsun/log" | ||
) | ||
|
||
type Cmd struct { | ||
EntryID cron.EntryID | ||
*Job | ||
*JobRule | ||
} | ||
|
||
func (c *Cmd) GetID() cron.EntryID { | ||
return c.EntryID | ||
} | ||
|
||
func (c *Cmd) Run() { | ||
log.Debugf("c:%+v", c) | ||
// 同时执行任务数限制 | ||
if c.Job.limit() { | ||
return | ||
} | ||
defer c.Job.unlimit() | ||
|
||
if c.Job.Kind != KindCommon { | ||
lk := c.lock() | ||
if lk == nil { | ||
return | ||
} | ||
defer lk.unlock() | ||
} | ||
|
||
if c.Job.Retry <= 0 { | ||
c.Job.Run() | ||
return | ||
} | ||
|
||
for i := 0; i <= c.Job.Retry; i++ { | ||
if c.Job.Run() { | ||
return | ||
} | ||
|
||
if c.Job.Interval > 0 { | ||
time.Sleep(time.Duration(c.Job.Interval) * time.Second) | ||
} | ||
} | ||
} | ||
func (c *Cmd) lockTtl() int64 { | ||
now := time.Now() | ||
prev := c.JobRule.Schedule.Next(now) | ||
ttl := int64(c.JobRule.Schedule.Next(prev).Sub(prev) / time.Second) | ||
if ttl == 0 { | ||
return 0 | ||
} | ||
|
||
if c.Job.Kind == KindInterval { | ||
ttl -= 2 | ||
if ttl > conf.Config.LockTtl { | ||
ttl = conf.Config.LockTtl | ||
} | ||
if ttl < 1 { | ||
ttl = 1 | ||
} | ||
return ttl | ||
} | ||
|
||
cost := c.Job.AvgTime / 1e3 | ||
if c.Job.AvgTime/1e3-cost*1e3 > 0 { | ||
cost += 1 | ||
} | ||
// 如果执行间隔时间不大于执行时间,把过期时间设置为执行时间的下限-1 | ||
// 以便下次执行的时候,能获取到 lock | ||
if ttl >= cost { | ||
ttl -= cost | ||
} | ||
|
||
if ttl > conf.Config.LockTtl { | ||
ttl = conf.Config.LockTtl | ||
} | ||
|
||
// 支持的最小时间间隔 2s | ||
if ttl < 2 { | ||
ttl = 2 | ||
} | ||
|
||
return ttl | ||
} | ||
|
||
func (c *Cmd) newLock() *locker { | ||
return &locker{ | ||
kind: c.Job.Kind, | ||
ttl: c.lockTtl(), | ||
done: make(chan struct{}), | ||
} | ||
} | ||
|
||
func (c *Cmd) lock() *locker { | ||
lk := c.newLock() | ||
// 非法的 rule | ||
if lk.ttl == 0 { | ||
return nil | ||
} | ||
|
||
resp, err := DefalutClient.Grant(lk.ttl) | ||
if err != nil { | ||
log.Infof("job[%s] didn't get a lock, err: %s", c.Job.Key(), err.Error()) | ||
return nil | ||
} | ||
|
||
ok, err := DefalutClient.GetLock(c.Job.ID, resp.ID) | ||
if err != nil { | ||
log.Infof("job[%s] didn't get a lock, err: %s", c.Job.Key(), err.Error()) | ||
return nil | ||
} | ||
|
||
if !ok { | ||
return nil | ||
} | ||
|
||
lk.lID = resp.ID | ||
if lk.kind == KindAlone { | ||
go lk.keepAlive() | ||
} | ||
return lk | ||
} |
Oops, something went wrong.