diff --git a/go.mod b/go.mod index e5bb34d..60ca13a 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e // indirect github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 // indirect + github.com/robfig/cron/v3 v3.0.0 // indirect github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af github.com/sirupsen/logrus v1.0.6 // indirect github.com/smartystreets/assertions v0.0.0-20180820201707-7c9eb446e3cf // indirect diff --git a/go.sum b/go.sum index 12746a6..78bf9e4 100644 --- a/go.sum +++ b/go.sum @@ -74,6 +74,8 @@ github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e h1:n/3MEhJQjQxrO github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 h1:agujYaXJSxSo18YNX3jzl+4G6Bstwt+kqv47GS12uL0= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= +github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af h1:gu+uRPtBe88sKxUCEXRoeCvVG90TJmwhiqRpvdhQFng= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/sirupsen/logrus v1.0.6 h1:hcP1GmhGigz/O7h1WVUM5KklBp1JoNS9FggWKdj/j3s= diff --git a/job.go b/job.go index 7c30bdb..463dd9a 100644 --- a/job.go +++ b/job.go @@ -17,9 +17,9 @@ import ( client "github.com/coreos/etcd/clientv3" + cron "github.com/robfig/cron/v3" "github.com/shunfei/cronsun/conf" "github.com/shunfei/cronsun/log" - "github.com/shunfei/cronsun/node/cron" "github.com/shunfei/cronsun/utils" ) @@ -302,7 +302,7 @@ func (rule *JobRule) Valid() error { return ErrNilRule } - sch, err := cron.Parse(rule.Timer) + sch, err := cron.ParseStandard(rule.Timer) if err != nil { return fmt.Errorf("invalid JobRule[%s], parse err: %s", rule.Timer, err.Error()) } @@ -343,33 +343,36 @@ func DeleteJob(group, id string) (resp *client.DeleteResponse, err error) { // 获取所有任务 func GetJobs() (jobs map[string]*Job, err error) { + log.Debugf("") resp, err := DefalutClient.Get(conf.Config.Cmd, client.WithPrefix()) if err != nil { - return + log.Errorf("DefalutClient.Get() failed, err:%s", err) + return jobs, err } count := len(resp.Kvs) jobs = make(map[string]*Job, count) if count == 0 { - return + log.Warnf("job count is 0") + return jobs, err } for _, j := range resp.Kvs { job := new(Job) - if e := json.Unmarshal(j.Value, job); e != nil { - log.Warnf("job[%s] umarshal err: %s", string(j.Key), e.Error()) + if err := json.Unmarshal(j.Value, job); err != nil { + log.Warnf("job[%s] umarshal err: %s", string(j.Key), err) continue } if err := job.Valid(); err != nil { - log.Warnf("job[%s] is invalid: %s", string(j.Key), err.Error()) + log.Warnf("job[%s] is invalid: %s", string(j.Key), err) continue } job.alone() jobs[job.ID] = job } - return + return jobs, err } func WatchJobs() client.WatchChan { @@ -422,7 +425,7 @@ func (j *Job) GetNextRunTime() time.Time { return nextTime } for i, r := range j.Rules { - sch, err := cron.Parse(r.Timer) + sch, err := cron.ParseStandard(r.Timer) if err != nil { return nextTime } @@ -618,7 +621,7 @@ func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) { log.Debugf("nid:%s, gs:%+v", nid, gs) cmds = make(map[string]*Cmd) if j.Pause { - return + return cmds } LOOP_TIMER_CMD: @@ -643,7 +646,7 @@ LOOP_TIMER_CMD: } } - return + return cmds } // 判断该job是否会在该节点上运行 diff --git a/node/cron/LICENSE b/node/cron/LICENSE deleted file mode 100644 index 3a0f627..0000000 --- a/node/cron/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -Copyright (C) 2012 Rob Figueiredo -All Rights Reserved. - -MIT LICENSE - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software is furnished to do so, -subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/node/cron/README.md b/node/cron/README.md deleted file mode 100644 index 157ed08..0000000 --- a/node/cron/README.md +++ /dev/null @@ -1,2 +0,0 @@ -[![GoDoc](http://godoc.org/github.com/robfig/cron?status.png)](http://godoc.org/github.com/robfig/cron) -[![Build Status](https://travis-ci.org/robfig/cron.svg?branch=master)](https://travis-ci.org/robfig/cron) diff --git a/node/cron/at.go b/node/cron/at.go deleted file mode 100644 index 4f6ab8c..0000000 --- a/node/cron/at.go +++ /dev/null @@ -1,34 +0,0 @@ -package cron - -import ( - "sort" - "time" -) - -// TimeListSchedule will run at the specify giving time. -type TimeListSchedule struct { - timeList []time.Time -} - -// At returns a crontab Schedule that activates every specify time. -func At(tl []time.Time) *TimeListSchedule { - sort.Slice(tl, func(i, j int) bool { return tl[i].Unix() < tl[j].Unix() }) - return &TimeListSchedule{ - timeList: tl, - } -} - -// Next returns the next time this should be run. -// This rounds so that the next activation time will be on the second. -func (schedule *TimeListSchedule) Next(t time.Time) time.Time { - cur := 0 - for cur < len(schedule.timeList) { - nextt := schedule.timeList[cur] - cur++ - if nextt.UnixNano() <= t.UnixNano() { - continue - } - return nextt - } - return time.Time{} -} diff --git a/node/cron/at_test.go b/node/cron/at_test.go deleted file mode 100644 index fc10cd8..0000000 --- a/node/cron/at_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package cron - -import ( - "testing" - "time" -) - -func TestTimeListNext(t *testing.T) { - tests := []struct { - startTime string - times []string - expected []string - }{ - // Simple cases - { - "2018-09-01 08:01:02", - []string{"2018-09-01 10:01:02"}, - []string{"2018-09-01 10:01:02"}, - }, - - // sort list - { - "2018-09-01 08:01:02", - []string{"2018-09-01 10:01:02", "2018-09-02 10:01:02"}, - []string{"2018-09-01 10:01:02", "2018-09-02 10:01:02"}, - }, - - // sort list with middle start time - { - "2018-09-01 10:11:02", - []string{"2018-09-01 10:01:02", "2018-09-02 10:01:02"}, - []string{"2018-09-02 10:01:02"}, - }, - - // unsorted list - { - "2018-07-01 08:01:02", - []string{"2018-09-01 10:01:00", "2018-08-01 10:00:00", "2018-09-01 10:00:00", "2018-08-02 10:01:02"}, - []string{"2018-08-01 10:00:00", "2018-08-02 10:01:02", "2018-09-01 10:00:00", "2018-09-01 10:01:00"}, - }, - - // unsorted list with middle start time - { - "2018-08-03 12:00:00", - []string{"2018-09-01 10:01:00", "2018-08-01 10:00:00", "2018-09-01 10:00:00", "2018-08-02 10:01:02"}, - []string{"2018-09-01 10:00:00", "2018-09-01 10:01:00"}, - }, - } - - for _, c := range tests { - tls := At(getAtTimes(c.times)) - nextTime := getAtTime(c.startTime) - for _, trun := range c.expected { - actual := tls.Next(nextTime) - expected := getAtTime(trun) - if actual != expected { - t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", - c.startTime, c.times, expected, actual) - } - nextTime = actual - } - if actual := tls.Next(nextTime); !actual.IsZero() { - t.Errorf("%s, \"%s\": next time should be zero, but got %v (actual)", - c.startTime, c.times, actual) - } - - } -} - -func getAtTime(value string) time.Time { - if value == "" { - panic("time string is empty") - } - - t, err := time.Parse("2006-01-02 15:04:05", value) - if err != nil { - panic(err) - } - - return t -} - -func getAtTimes(values []string) []time.Time { - tl := []time.Time{} - for _, v := range values { - tl = append(tl, getAtTime(v)) - } - return tl -} diff --git a/node/cron/constantdelay.go b/node/cron/constantdelay.go deleted file mode 100644 index cd6e7b1..0000000 --- a/node/cron/constantdelay.go +++ /dev/null @@ -1,27 +0,0 @@ -package cron - -import "time" - -// ConstantDelaySchedule represents a simple recurring duty cycle, e.g. "Every 5 minutes". -// It does not support jobs more frequent than once a second. -type ConstantDelaySchedule struct { - Delay time.Duration -} - -// Every returns a crontab Schedule that activates once every duration. -// Delays of less than a second are not supported (will round up to 1 second). -// Any fields less than a Second are truncated. -func Every(duration time.Duration) ConstantDelaySchedule { - if duration < time.Second { - duration = time.Second - } - return ConstantDelaySchedule{ - Delay: duration - time.Duration(duration.Nanoseconds())%time.Second, - } -} - -// Next returns the next time this should be run. -// This rounds so that the next activation time will be on the second. -func (schedule ConstantDelaySchedule) Next(t time.Time) time.Time { - return t.Add(schedule.Delay - time.Duration(t.Nanosecond())*time.Nanosecond) -} diff --git a/node/cron/constantdelay_test.go b/node/cron/constantdelay_test.go deleted file mode 100644 index f43a58a..0000000 --- a/node/cron/constantdelay_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package cron - -import ( - "testing" - "time" -) - -func TestConstantDelayNext(t *testing.T) { - tests := []struct { - time string - delay time.Duration - expected string - }{ - // Simple cases - {"Mon Jul 9 14:45 2012", 15*time.Minute + 50*time.Nanosecond, "Mon Jul 9 15:00 2012"}, - {"Mon Jul 9 14:59 2012", 15 * time.Minute, "Mon Jul 9 15:14 2012"}, - {"Mon Jul 9 14:59:59 2012", 15 * time.Minute, "Mon Jul 9 15:14:59 2012"}, - - // Wrap around hours - {"Mon Jul 9 15:45 2012", 35 * time.Minute, "Mon Jul 9 16:20 2012"}, - - // Wrap around days - {"Mon Jul 9 23:46 2012", 14 * time.Minute, "Tue Jul 10 00:00 2012"}, - {"Mon Jul 9 23:45 2012", 35 * time.Minute, "Tue Jul 10 00:20 2012"}, - {"Mon Jul 9 23:35:51 2012", 44*time.Minute + 24*time.Second, "Tue Jul 10 00:20:15 2012"}, - {"Mon Jul 9 23:35:51 2012", 25*time.Hour + 44*time.Minute + 24*time.Second, "Thu Jul 11 01:20:15 2012"}, - - // Wrap around months - {"Mon Jul 9 23:35 2012", 91*24*time.Hour + 25*time.Minute, "Thu Oct 9 00:00 2012"}, - - // Wrap around minute, hour, day, month, and year - {"Mon Dec 31 23:59:45 2012", 15 * time.Second, "Tue Jan 1 00:00:00 2013"}, - - // Round to nearest second on the delay - {"Mon Jul 9 14:45 2012", 15*time.Minute + 50*time.Nanosecond, "Mon Jul 9 15:00 2012"}, - - // Round up to 1 second if the duration is less. - {"Mon Jul 9 14:45:00 2012", 15 * time.Millisecond, "Mon Jul 9 14:45:01 2012"}, - - // Round to nearest second when calculating the next time. - {"Mon Jul 9 14:45:00.005 2012", 15 * time.Minute, "Mon Jul 9 15:00 2012"}, - - // Round to nearest second for both. - {"Mon Jul 9 14:45:00.005 2012", 15*time.Minute + 50*time.Nanosecond, "Mon Jul 9 15:00 2012"}, - } - - for _, c := range tests { - actual := Every(c.delay).Next(getTime(c.time)) - expected := getTime(c.expected) - if actual != expected { - t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.delay, expected, actual) - } - } -} diff --git a/node/cron/cron.go b/node/cron/cron.go deleted file mode 100644 index 8ba6cb9..0000000 --- a/node/cron/cron.go +++ /dev/null @@ -1,307 +0,0 @@ -// This library implements a cron spec parser and runner. See the README for -// more details. -package cron - -import ( - "fmt" - "log" - "reflect" - "runtime" - "sort" - "time" -) - -// Cron keeps track of any number of entries, invoking the associated func as -// specified by the schedule. It may be started, stopped, and the entries may -// be inspected while running. -type Cron struct { - entries []*Entry - indexes map[string]int - stop chan struct{} - add chan *Entry - del chan string - snapshot chan []*Entry - running bool - ErrorLog *log.Logger - location *time.Location -} - -// Job is an interface for submitted cron jobs. -type Job interface { - GetID() string - Run() -} - -// The Schedule describes a job's duty cycle. -type Schedule interface { - // Return the next activation time, later than the given time. - // Next is invoked initially, and then each time the job is run. - Next(time.Time) time.Time -} - -// Entry consists of a schedule and the func to execute on that schedule. -type Entry struct { - // The ID is unique for Entry - ID string - - // The schedule on which this job should be run. - Schedule Schedule - - // The next time the job will run. This is the zero time if Cron has not been - // started or this entry's schedule is unsatisfiable - Next time.Time - - // The last time this job was run. This is the zero time if the job has never - // been run. - Prev time.Time - - // The Job to run. - Job Job -} - -// byTime is a wrapper for sorting the entry array by time -// (with zero time at the end). -type byTime []*Entry - -func (s byTime) Len() int { return len(s) } -func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s byTime) Less(i, j int) bool { - // Two zero times should return false. - // Otherwise, zero is "greater" than any other time. - // (To sort it at the end of the list.) - if s[i].Next.IsZero() { - return false - } - if s[j].Next.IsZero() { - return true - } - return s[i].Next.Before(s[j].Next) -} - -// New returns a new Cron job runner, in the Local time zone. -func New() *Cron { - return NewWithLocation(time.Now().Location()) -} - -// NewWithLocation returns a new Cron job runner. -func NewWithLocation(location *time.Location) *Cron { - return &Cron{ - entries: nil, - indexes: make(map[string]int), - add: make(chan *Entry), - del: make(chan string), - stop: make(chan struct{}), - snapshot: make(chan []*Entry), - running: false, - ErrorLog: nil, - location: location, - } -} - -// A wrapper that turns a func() into a cron.Job -type FuncJob func() - -func (f FuncJob) GetID() string { - return fmt.Sprintf("pointer[%v]", reflect.ValueOf(f).Pointer()) -} -func (f FuncJob) Run() { f() } - -// AddFunc adds or updates a func to the Cron to be run on the given schedule. -func (c *Cron) AddFunc(spec string, cmd func()) error { - return c.AddJob(spec, FuncJob(cmd)) -} - -// AddJob adds or updates a Job to the Cron to be run on the given schedule. -func (c *Cron) AddJob(spec string, cmd Job) error { - schedule, err := Parse(spec) - if err != nil { - return err - } - c.Schedule(schedule, cmd) - return nil -} - -// Schedule adds or updates a Job to the Cron to be run on the given schedule. -func (c *Cron) Schedule(schedule Schedule, cmd Job) { - entry := &Entry{ - ID: cmd.GetID(), - Schedule: schedule, - Job: cmd, - } - if !c.running { - if index, ok := c.indexes[entry.ID]; ok { - c.entries[index] = entry - return - } - c.entries, c.indexes[entry.ID] = append(c.entries, entry), len(c.entries) - return - } - - c.add <- entry -} - -// DelFunc deletes a Job from the Cron. -func (c *Cron) DelFunc(cmd func()) { - c.DelJob(FuncJob(cmd)) -} - -// DelJob deletes a Job from the Cron. -func (c *Cron) DelJob(cmd Job) { - index, ok := c.indexes[cmd.GetID()] - if !ok { - return - } - - if c.running { - c.del <- cmd.GetID() - return - } - - c.entries = append(c.entries[:index], c.entries[index+1:]...) - delete(c.indexes, cmd.GetID()) - return -} - -// Entries returns a snapshot of the cron entries. -func (c *Cron) Entries() []*Entry { - if c.running { - c.snapshot <- nil - x := <-c.snapshot - return x - } - return c.entrySnapshot() -} - -// Location gets the time zone location -func (c *Cron) Location() *time.Location { - return c.location -} - -// Start the cron scheduler in its own go-routine, or no-op if already started. -func (c *Cron) Start() { - if c.running { - return - } - c.running = true - go c.run() -} - -func (c *Cron) runWithRecovery(j Job) { - defer func() { - if r := recover(); r != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - c.logf("cron: panic running job: %v\n%s", r, buf) - } - }() - j.Run() -} - -// Rebuild the indexes -func (c *Cron) reIndex() { - for i, count := 0, len(c.entries); i < count; i++ { - c.indexes[c.entries[i].ID] = i - } -} - -// Run the scheduler.. this is private just due to the need to synchronize -// access to the 'running' state variable. -func (c *Cron) run() { - // Figure out the next activation times for each entry. - now := time.Now().In(c.location) - for _, entry := range c.entries { - entry.Next = entry.Schedule.Next(now) - } - - timer := time.NewTimer(time.Minute) - for { - // Determine the next entry to run. - sort.Sort(byTime(c.entries)) - c.reIndex() - - var effective time.Time - if len(c.entries) == 0 || c.entries[0].Next.IsZero() { - // If there are no entries yet, just sleep - it still handles new entries - // and stop requests. - effective = now.AddDate(10, 0, 0) - } else { - effective = c.entries[0].Next - } - - timer.Reset(effective.Sub(now)) - select { - case now = <-timer.C: - now = now.In(c.location) - // Run every entry whose next time was this effective time. - for _, e := range c.entries { - if e.Next != effective { - break - } - go c.runWithRecovery(e.Job) - e.Prev = e.Next - e.Next = e.Schedule.Next(now) - } - continue - - case newEntry := <-c.add: - if index, ok := c.indexes[newEntry.ID]; ok { - c.entries[index] = newEntry - } else { - c.entries, c.indexes[newEntry.ID] = append(c.entries, newEntry), len(c.entries) - } - newEntry.Next = newEntry.Schedule.Next(time.Now().In(c.location)) - - case id := <-c.del: - index, ok := c.indexes[id] - if !ok { - continue - } - - c.entries = append(c.entries[:index], c.entries[index+1:]...) - delete(c.indexes, id) - - case <-c.snapshot: - c.snapshot <- c.entrySnapshot() - - case <-c.stop: - timer.Stop() - return - } - - // 'now' should be updated after newEntry and snapshot cases. - now = time.Now().In(c.location) - } -} - -// Logs an error to stderr or to the configured error log -func (c *Cron) logf(format string, args ...interface{}) { - if c.ErrorLog != nil { - c.ErrorLog.Printf(format, args...) - } else { - log.Printf(format, args...) - } -} - -// Stop stops the cron scheduler if it is running; otherwise it does nothing. -func (c *Cron) Stop() { - if !c.running { - return - } - c.stop <- struct{}{} - c.running = false -} - -// entrySnapshot returns a copy of the current cron entry list. -func (c *Cron) entrySnapshot() []*Entry { - entries := []*Entry{} - for _, e := range c.entries { - entries = append(entries, &Entry{ - Schedule: e.Schedule, - Next: e.Next, - Prev: e.Prev, - Job: e.Job, - }) - } - return entries -} diff --git a/node/cron/cron_test.go b/node/cron/cron_test.go deleted file mode 100644 index dd551d0..0000000 --- a/node/cron/cron_test.go +++ /dev/null @@ -1,347 +0,0 @@ -package cron - -import ( - "fmt" - "reflect" - "sync" - "sync/atomic" - "testing" - "time" -) - -// Many tests schedule a job for every second, and then wait at most a second -// for it to run. This amount is just slightly larger than 1 second to -// compensate for a few milliseconds of runtime. -const ONE_SECOND = 1*time.Second + 10*time.Millisecond - -func TestFuncPanicRecovery(t *testing.T) { - cron := New() - cron.Start() - defer cron.Stop() - cron.AddFunc("* * * * * ?", func() { panic("YOLO") }) - - select { - case <-time.After(ONE_SECOND): - return - } -} - -type DummyJob struct{} - -func (d DummyJob) GetID() string { - return fmt.Sprintf("pointer[%v]", reflect.ValueOf(&d).Pointer()) -} - -func (d DummyJob) Run() { - panic("YOLO") -} - -func TestJobPanicRecovery(t *testing.T) { - var job DummyJob - - cron := New() - cron.Start() - defer cron.Stop() - cron.AddJob("* * * * * ?", job) - - select { - case <-time.After(ONE_SECOND): - return - } -} - -// Start and stop cron with no entries. -func TestNoEntries(t *testing.T) { - cron := New() - cron.Start() - - select { - case <-time.After(ONE_SECOND): - t.FailNow() - case <-stop(cron): - } -} - -// Start, stop, then add an entry. Verify entry doesn't run. -func TestStopCausesJobsToNotRun(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(1) - - cron := New() - cron.Start() - cron.Stop() - cron.AddFunc("* * * * * ?", func() { wg.Done() }) - - select { - case <-time.After(ONE_SECOND): - // No job ran! - case <-wait(wg): - t.FailNow() - } -} - -// Add a job, start cron, expect it runs. -func TestAddBeforeRunning(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(1) - - cron := New() - cron.AddFunc("* * * * * ?", func() { wg.Done() }) - cron.Start() - defer cron.Stop() - - // Give cron 2 seconds to run our job (which is always activated). - select { - case <-time.After(ONE_SECOND): - t.FailNow() - case <-wait(wg): - } -} - -// Start cron, add a job, expect it runs. -func TestAddWhileRunning(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(1) - - cron := New() - cron.Start() - defer cron.Stop() - cron.AddFunc("* * * * * ?", func() { wg.Done() }) - - select { - case <-time.After(ONE_SECOND): - t.FailNow() - case <-wait(wg): - } -} - -// Test for #34. Adding a job after calling start results in multiple job invocations -func TestAddWhileRunningWithDelay(t *testing.T) { - cron := New() - cron.Start() - defer cron.Stop() - time.Sleep(5 * time.Second) - var calls int32 = 0 - cron.AddFunc("* * * * * *", func() { atomic.AddInt32(&calls, 1) }) - - <-time.After(ONE_SECOND) - if atomic.LoadInt32(&calls) != 1 { - fmt.Printf("called %d times, expected 1\n", atomic.LoadInt32(&calls)) - t.Fail() - } -} - -// Test timing with Entries. -func TestSnapshotEntries(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(1) - - cron := New() - cron.AddFunc("@every 2s", func() { wg.Done() }) - cron.Start() - defer cron.Stop() - - // Cron should fire in 2 seconds. After 1 second, call Entries. - select { - case <-time.After(ONE_SECOND): - cron.Entries() - } - - // Even though Entries was called, the cron should fire at the 2 second mark. - select { - case <-time.After(ONE_SECOND): - t.FailNow() - case <-wait(wg): - } - -} - -// Test that the entries are correctly sorted. -// Add a bunch of long-in-the-future entries, and an immediate entry, and ensure -// that the immediate entry runs immediately. -// Also: Test that multiple jobs run in the same instant. -func TestMultipleEntries(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(2) - - cron := New() - cron.AddFunc("0 0 0 1 1 ?", func() {}) - cron.AddFunc("* * * * * ?", func() { wg.Done() }) - cron.AddFunc("0 0 0 31 12 ?", func() {}) - cron.AddFunc("* * * * * ?", func() { wg.Done() }) - - cron.Start() - defer cron.Stop() - - select { - case <-time.After(ONE_SECOND): - t.FailNow() - case <-wait(wg): - } -} - -// Test running the same job twice. -func TestRunningJobTwice(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(2) - - cron := New() - cron.AddFunc("0 0 0 1 1 ?", func() {}) - cron.AddFunc("0 0 0 31 12 ?", func() {}) - cron.AddFunc("* * * * * ?", func() { wg.Done() }) - - cron.Start() - defer cron.Stop() - - select { - case <-time.After(2 * ONE_SECOND): - t.FailNow() - case <-wait(wg): - } -} - -func TestRunningMultipleSchedules(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(2) - - cron := New() - cron.AddFunc("0 0 0 1 1 ?", func() {}) - cron.AddFunc("0 0 0 31 12 ?", func() {}) - cron.AddFunc("* * * * * ?", func() { wg.Done() }) - cron.Schedule(Every(time.Minute), FuncJob(func() {})) - cron.Schedule(Every(time.Second), FuncJob(func() { wg.Done() })) - cron.Schedule(Every(time.Hour), FuncJob(func() {})) - - cron.Start() - defer cron.Stop() - - select { - case <-time.After(2 * ONE_SECOND): - t.FailNow() - case <-wait(wg): - } -} - -// Test that the cron is run in the local time zone (as opposed to UTC). -func TestLocalTimezone(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(2) - - now := time.Now().Local() - spec := fmt.Sprintf("%d,%d %d %d %d %d ?", - now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month()) - - cron := New() - cron.AddFunc(spec, func() { wg.Done() }) - cron.Start() - defer cron.Stop() - - select { - case <-time.After(ONE_SECOND * 3): - t.FailNow() - case <-wait(wg): - } -} - -// Test that the cron is run in the given time zone (as opposed to local). -func TestNonLocalTimezone(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(2) - - loc, err := time.LoadLocation("Atlantic/Cape_Verde") - if err != nil { - fmt.Printf("Failed to load time zone Atlantic/Cape_Verde: %+v", err) - t.Fail() - } - - now := time.Now().In(loc) - spec := fmt.Sprintf("%d,%d %d %d %d %d ?", - now.Second()+1, now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month()) - - cron := NewWithLocation(loc) - cron.AddFunc(spec, func() { wg.Done() }) - cron.Start() - defer cron.Stop() - - select { - case <-time.After(ONE_SECOND * 2): - t.FailNow() - case <-wait(wg): - } -} - -// Test that calling stop before start silently returns without -// blocking the stop channel. -func TestStopWithoutStart(t *testing.T) { - cron := New() - cron.Stop() -} - -type testJob struct { - wg *sync.WaitGroup - name string -} - -func (t testJob) GetID() string { - return t.name -} - -func (t testJob) Run() { - t.wg.Done() -} - -// Simple test using Runnables. -func TestJob(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(1) - - cron := New() - cron.AddJob("0 0 0 30 Feb ?", testJob{wg, "job0"}) - cron.AddJob("0 0 0 1 1 ?", testJob{wg, "job1"}) - cron.AddJob("* * * * * ?", testJob{wg, "job2"}) - cron.AddJob("1 0 0 1 1 ?", testJob{wg, "job3"}) - cron.Schedule(Every(5*time.Second+5*time.Nanosecond), testJob{wg, "job4"}) - cron.Schedule(Every(5*time.Minute), testJob{wg, "job5"}) - - cron.Start() - defer cron.Stop() - - select { - case <-time.After(ONE_SECOND): - t.FailNow() - case <-wait(wg): - } - - // Ensure the entries are in the right order. - expecteds := []string{"job2", "job4", "job5", "job1", "job3", "job0"} - - var actuals []string - for _, entry := range cron.Entries() { - actuals = append(actuals, entry.Job.(testJob).name) - } - - for i, expected := range expecteds { - if actuals[i] != expected { - t.Errorf("Jobs not in the right order. (expected) %s != %s (actual)", expecteds, actuals) - t.FailNow() - } - } -} - -func wait(wg *sync.WaitGroup) chan bool { - ch := make(chan bool) - go func() { - wg.Wait() - ch <- true - }() - return ch -} - -func stop(cron *Cron) chan bool { - ch := make(chan bool) - go func() { - cron.Stop() - ch <- true - }() - return ch -} diff --git a/node/cron/doc.go b/node/cron/doc.go deleted file mode 100644 index dbdf501..0000000 --- a/node/cron/doc.go +++ /dev/null @@ -1,129 +0,0 @@ -/* -Package cron implements a cron spec parser and job runner. - -Usage - -Callers may register Funcs to be invoked on a given schedule. Cron will run -them in their own goroutines. - - c := cron.New() - c.AddFunc("0 30 * * * *", func() { fmt.Println("Every hour on the half hour") }) - c.AddFunc("@hourly", func() { fmt.Println("Every hour") }) - c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty") }) - c.Start() - .. - // Funcs are invoked in their own goroutine, asynchronously. - ... - // Funcs may also be added to a running Cron - c.AddFunc("@daily", func() { fmt.Println("Every day") }) - .. - // Inspect the cron job entries' next and previous run times. - inspect(c.Entries()) - .. - c.Stop() // Stop the scheduler (does not stop any jobs already running). - -CRON Expression Format - -A cron expression represents a set of times, using 6 space-separated fields. - - Field name | Mandatory? | Allowed values | Allowed special characters - ---------- | ---------- | -------------- | -------------------------- - Seconds | Yes | 0-59 | * / , - - Minutes | Yes | 0-59 | * / , - - Hours | Yes | 0-23 | * / , - - Day of month | Yes | 1-31 | * / , - ? - Month | Yes | 1-12 or JAN-DEC | * / , - - Day of week | Yes | 0-6 or SUN-SAT | * / , - ? - -Note: Month and Day-of-week field values are case insensitive. "SUN", "Sun", -and "sun" are equally accepted. - -Special Characters - -Asterisk ( * ) - -The asterisk indicates that the cron expression will match for all values of the -field; e.g., using an asterisk in the 5th field (month) would indicate every -month. - -Slash ( / ) - -Slashes are used to describe increments of ranges. For example 3-59/15 in the -1st field (minutes) would indicate the 3rd minute of the hour and every 15 -minutes thereafter. The form "*\/..." is equivalent to the form "first-last/...", -that is, an increment over the largest possible range of the field. The form -"N/..." is accepted as meaning "N-MAX/...", that is, starting at N, use the -increment until the end of that specific range. It does not wrap around. - -Comma ( , ) - -Commas are used to separate items of a list. For example, using "MON,WED,FRI" in -the 5th field (day of week) would mean Mondays, Wednesdays and Fridays. - -Hyphen ( - ) - -Hyphens are used to define ranges. For example, 9-17 would indicate every -hour between 9am and 5pm inclusive. - -Question mark ( ? ) - -Question mark may be used instead of '*' for leaving either day-of-month or -day-of-week blank. - -Predefined schedules - -You may use one of several pre-defined schedules in place of a cron expression. - - Entry | Description | Equivalent To - ----- | ----------- | ------------- - @yearly (or @annually) | Run once a year, midnight, Jan. 1st | 0 0 0 1 1 * - @monthly | Run once a month, midnight, first of month | 0 0 0 1 * * - @weekly | Run once a week, midnight on Sunday | 0 0 0 * * 0 - @daily (or @midnight) | Run once a day, midnight | 0 0 0 * * * - @hourly | Run once an hour, beginning of hour | 0 0 * * * * - -Intervals - -You may also schedule a job to execute at fixed intervals. This is supported by -formatting the cron spec like this: - - @every - -where "duration" is a string accepted by time.ParseDuration -(http://golang.org/pkg/time/#ParseDuration). - -For example, "@every 1h30m10s" would indicate a schedule that activates every -1 hour, 30 minutes, 10 seconds. - -Note: The interval does not take the job runtime into account. For example, -if a job takes 3 minutes to run, and it is scheduled to run every 5 minutes, -it will have only 2 minutes of idle time between each run. - -Time zones - -All interpretation and scheduling is done in the machine's local time zone (as -provided by the Go time package (http://www.golang.org/pkg/time). - -Be aware that jobs scheduled during daylight-savings leap-ahead transitions will -not be run! - -Thread safety - -Since the Cron service runs concurrently with the calling code, some amount of -care must be taken to ensure proper synchronization. - -All cron methods are designed to be correctly synchronized as long as the caller -ensures that invocations have a clear happens-before ordering between them. - -Implementation - -Cron entries are stored in an array, sorted by their next activation time. Cron -sleeps until the next job is due to be run. - -Upon waking: - - it runs each entry that is active on that second - - it calculates the next run times for the jobs that were run - - it re-sorts the array of entries by next activation time. - - it goes to sleep until the soonest job. -*/ -package cron diff --git a/node/cron/parser.go b/node/cron/parser.go deleted file mode 100644 index 3bbd2a3..0000000 --- a/node/cron/parser.go +++ /dev/null @@ -1,393 +0,0 @@ -package cron - -import ( - "fmt" - "math" - "strconv" - "strings" - "time" -) - -// Configuration options for creating a parser. Most options specify which -// fields should be included, while others enable features. If a field is not -// included the parser will assume a default value. These options do not change -// the order fields are parse in. -type ParseOption int - -const ( - Second ParseOption = 1 << iota // Seconds field, default 0 - Minute // Minutes field, default 0 - Hour // Hours field, default 0 - Dom // Day of month field, default * - Month // Month field, default * - Dow // Day of week field, default * - DowOptional // Optional day of week field, default * - Descriptor // Allow descriptors such as @monthly, @weekly, etc. -) - -var places = []ParseOption{ - Second, - Minute, - Hour, - Dom, - Month, - Dow, -} - -var defaults = []string{ - "0", - "0", - "0", - "*", - "*", - "*", -} - -// A custom Parser that can be configured. -type Parser struct { - options ParseOption - optionals int -} - -// Creates a custom Parser with custom options. -// -// // Standard parser without descriptors -// specParser := NewParser(Minute | Hour | Dom | Month | Dow) -// sched, err := specParser.Parse("0 0 15 */3 *") -// -// // Same as above, just excludes time fields -// subsParser := NewParser(Dom | Month | Dow) -// sched, err := specParser.Parse("15 */3 *") -// -// // Same as above, just makes Dow optional -// subsParser := NewParser(Dom | Month | DowOptional) -// sched, err := specParser.Parse("15 */3") -// -func NewParser(options ParseOption) Parser { - optionals := 0 - if options&DowOptional > 0 { - options |= Dow - optionals++ - } - return Parser{options, optionals} -} - -// Parse returns a new crontab schedule representing the given spec. -// It returns a descriptive error if the spec is not valid. -// It accepts crontab specs and features configured by NewParser. -func (p Parser) Parse(spec string) (Schedule, error) { - if spec[0] == '@' && p.options&Descriptor > 0 { - return parseDescriptor(spec) - } - - // Figure out how many fields we need - max := 0 - for _, place := range places { - if p.options&place > 0 { - max++ - } - } - min := max - p.optionals - - // Split fields on whitespace - fields := strings.Fields(spec) - - // Validate number of fields - if count := len(fields); count < min || count > max { - if min == max { - return nil, fmt.Errorf("Expected exactly %d fields, found %d: %s", min, count, spec) - } - return nil, fmt.Errorf("Expected %d to %d fields, found %d: %s", min, max, count, spec) - } - - // Fill in missing fields - fields = expandFields(fields, p.options) - - var err error - field := func(field string, r bounds) uint64 { - if err != nil { - return 0 - } - var bits uint64 - bits, err = getField(field, r) - return bits - } - - var ( - second = field(fields[0], seconds) - minute = field(fields[1], minutes) - hour = field(fields[2], hours) - dayofmonth = field(fields[3], dom) - month = field(fields[4], months) - dayofweek = field(fields[5], dow) - ) - if err != nil { - return nil, err - } - - return &SpecSchedule{ - Second: second, - Minute: minute, - Hour: hour, - Dom: dayofmonth, - Month: month, - Dow: dayofweek, - }, nil -} - -func expandFields(fields []string, options ParseOption) []string { - n := 0 - count := len(fields) - expFields := make([]string, len(places)) - copy(expFields, defaults) - for i, place := range places { - if options&place > 0 { - expFields[i] = fields[n] - n++ - } - if n == count { - break - } - } - return expFields -} - -var standardParser = NewParser( - Minute | Hour | Dom | Month | Dow | Descriptor, -) - -// ParseStandard returns a new crontab schedule representing the given standardSpec -// (https://en.wikipedia.org/wiki/Cron). It differs from Parse requiring to always -// pass 5 entries representing: minute, hour, day of month, month and day of week, -// in that order. It returns a descriptive error if the spec is not valid. -// -// It accepts -// - Standard crontab specs, e.g. "* * * * ?" -// - Descriptors, e.g. "@midnight", "@every 1h30m" -func ParseStandard(standardSpec string) (Schedule, error) { - return standardParser.Parse(standardSpec) -} - -var defaultParser = NewParser( - Second | Minute | Hour | Dom | Month | DowOptional | Descriptor, -) - -// Parse returns a new crontab schedule representing the given spec. -// It returns a descriptive error if the spec is not valid. -// -// It accepts -// - Full crontab specs, e.g. "* * * * * ?" -// - Descriptors, e.g. "@midnight", "@every 1h30m" -func Parse(spec string) (Schedule, error) { - return defaultParser.Parse(spec) -} - -// getField returns an Int with the bits set representing all of the times that -// the field represents or error parsing field value. A "field" is a comma-separated -// list of "ranges". -func getField(field string, r bounds) (uint64, error) { - var bits uint64 - ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' }) - for _, expr := range ranges { - bit, err := getRange(expr, r) - if err != nil { - return bits, err - } - bits |= bit - } - return bits, nil -} - -// getRange returns the bits indicated by the given expression: -// number | number "-" number [ "/" number ] -// or error parsing range. -func getRange(expr string, r bounds) (uint64, error) { - var ( - start, end, step uint - rangeAndStep = strings.Split(expr, "/") - lowAndHigh = strings.Split(rangeAndStep[0], "-") - singleDigit = len(lowAndHigh) == 1 - err error - ) - - var extra uint64 - if lowAndHigh[0] == "*" || lowAndHigh[0] == "?" { - start = r.min - end = r.max - extra = starBit - } else { - start, err = parseIntOrName(lowAndHigh[0], r.names) - if err != nil { - return 0, err - } - switch len(lowAndHigh) { - case 1: - end = start - case 2: - end, err = parseIntOrName(lowAndHigh[1], r.names) - if err != nil { - return 0, err - } - default: - return 0, fmt.Errorf("Too many hyphens: %s", expr) - } - } - - switch len(rangeAndStep) { - case 1: - step = 1 - case 2: - step, err = mustParseInt(rangeAndStep[1]) - if err != nil { - return 0, err - } - - // Special handling: "N/step" means "N-max/step". - if singleDigit { - end = r.max - } - default: - return 0, fmt.Errorf("Too many slashes: %s", expr) - } - - if start < r.min { - return 0, fmt.Errorf("Beginning of range (%d) below minimum (%d): %s", start, r.min, expr) - } - if end > r.max { - return 0, fmt.Errorf("End of range (%d) above maximum (%d): %s", end, r.max, expr) - } - if start > end { - return 0, fmt.Errorf("Beginning of range (%d) beyond end of range (%d): %s", start, end, expr) - } - if step == 0 { - return 0, fmt.Errorf("Step of range should be a positive number: %s", expr) - } - - return getBits(start, end, step) | extra, nil -} - -// parseIntOrName returns the (possibly-named) integer contained in expr. -func parseIntOrName(expr string, names map[string]uint) (uint, error) { - if names != nil { - if namedInt, ok := names[strings.ToLower(expr)]; ok { - return namedInt, nil - } - } - return mustParseInt(expr) -} - -// mustParseInt parses the given expression as an int or returns an error. -func mustParseInt(expr string) (uint, error) { - num, err := strconv.Atoi(expr) - if err != nil { - return 0, fmt.Errorf("Failed to parse int from %s: %s", expr, err) - } - if num < 0 { - return 0, fmt.Errorf("Negative number (%d) not allowed: %s", num, expr) - } - - return uint(num), nil -} - -// getBits sets all bits in the range [min, max], modulo the given step size. -func getBits(min, max, step uint) uint64 { - var bits uint64 - - // If step is 1, use shifts. - if step == 1 { - return ^(math.MaxUint64 << (max + 1)) & (math.MaxUint64 << min) - } - - // Else, use a simple loop. - for i := min; i <= max; i += step { - bits |= 1 << i - } - return bits -} - -// all returns all bits within the given bounds. (plus the star bit) -func all(r bounds) uint64 { - return getBits(r.min, r.max, 1) | starBit -} - -// parseDescriptor returns a predefined schedule for the expression, or error if none matches. -func parseDescriptor(descriptor string) (Schedule, error) { - switch descriptor { - case "@yearly", "@annually": - return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: 1 << dom.min, - Month: 1 << months.min, - Dow: all(dow), - }, nil - - case "@monthly": - return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: 1 << dom.min, - Month: all(months), - Dow: all(dow), - }, nil - - case "@weekly": - return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: all(dom), - Month: all(months), - Dow: 1 << dow.min, - }, nil - - case "@daily", "@midnight": - return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: all(dom), - Month: all(months), - Dow: all(dow), - }, nil - - case "@hourly": - return &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: all(hours), - Dom: all(dom), - Month: all(months), - Dow: all(dow), - }, nil - } - - const every = "@every " - if strings.HasPrefix(descriptor, every) { - duration, err := time.ParseDuration(descriptor[len(every):]) - if err != nil { - return nil, fmt.Errorf("Failed to parse duration %s: %s", descriptor, err) - } - return Every(duration), nil - } - - const at = "@at " - if strings.HasPrefix(descriptor, at) { - tss := strings.Split(descriptor[len(at):], ",") - atls := make([]time.Time, 0, len(tss)) - for _, ts := range tss { - ts = strings.TrimSpace(ts) - att, err := time.ParseInLocation("2006-01-02 15:04:05", ts, time.Local) - if err != nil { - return nil, fmt.Errorf("Failed to parse time %s: %s", descriptor, err) - } - atls = append(atls, att) - } - - return At(atls), nil - } - - return nil, fmt.Errorf("Unrecognized descriptor: %s", descriptor) -} diff --git a/node/cron/parser_test.go b/node/cron/parser_test.go deleted file mode 100644 index 099110e..0000000 --- a/node/cron/parser_test.go +++ /dev/null @@ -1,230 +0,0 @@ -package cron - -import ( - "reflect" - "strings" - "testing" - "time" -) - -func TestRange(t *testing.T) { - zero := uint64(0) - ranges := []struct { - expr string - min, max uint - expected uint64 - err string - }{ - {"5", 0, 7, 1 << 5, ""}, - {"0", 0, 7, 1 << 0, ""}, - {"7", 0, 7, 1 << 7, ""}, - - {"5-5", 0, 7, 1 << 5, ""}, - {"5-6", 0, 7, 1<<5 | 1<<6, ""}, - {"5-7", 0, 7, 1<<5 | 1<<6 | 1<<7, ""}, - - {"5-6/2", 0, 7, 1 << 5, ""}, - {"5-7/2", 0, 7, 1<<5 | 1<<7, ""}, - {"5-7/1", 0, 7, 1<<5 | 1<<6 | 1<<7, ""}, - - {"*", 1, 3, 1<<1 | 1<<2 | 1<<3 | starBit, ""}, - {"*/2", 1, 3, 1<<1 | 1<<3 | starBit, ""}, - - {"5--5", 0, 0, zero, "Too many hyphens"}, - {"jan-x", 0, 0, zero, "Failed to parse int from"}, - {"2-x", 1, 5, zero, "Failed to parse int from"}, - {"*/-12", 0, 0, zero, "Negative number"}, - {"*//2", 0, 0, zero, "Too many slashes"}, - {"1", 3, 5, zero, "below minimum"}, - {"6", 3, 5, zero, "above maximum"}, - {"5-3", 3, 5, zero, "beyond end of range"}, - {"*/0", 0, 0, zero, "should be a positive number"}, - } - - for _, c := range ranges { - actual, err := getRange(c.expr, bounds{c.min, c.max, nil}) - if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) { - t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) - } - if len(c.err) == 0 && err != nil { - t.Errorf("%s => unexpected error %v", c.expr, err) - } - if actual != c.expected { - t.Errorf("%s => expected %d, got %d", c.expr, c.expected, actual) - } - } -} - -func TestField(t *testing.T) { - fields := []struct { - expr string - min, max uint - expected uint64 - }{ - {"5", 1, 7, 1 << 5}, - {"5,6", 1, 7, 1<<5 | 1<<6}, - {"5,6,7", 1, 7, 1<<5 | 1<<6 | 1<<7}, - {"1,5-7/2,3", 1, 7, 1<<1 | 1<<5 | 1<<7 | 1<<3}, - } - - for _, c := range fields { - actual, _ := getField(c.expr, bounds{c.min, c.max, nil}) - if actual != c.expected { - t.Errorf("%s => expected %d, got %d", c.expr, c.expected, actual) - } - } -} - -func TestAll(t *testing.T) { - allBits := []struct { - r bounds - expected uint64 - }{ - {minutes, 0xfffffffffffffff}, // 0-59: 60 ones - {hours, 0xffffff}, // 0-23: 24 ones - {dom, 0xfffffffe}, // 1-31: 31 ones, 1 zero - {months, 0x1ffe}, // 1-12: 12 ones, 1 zero - {dow, 0x7f}, // 0-6: 7 ones - } - - for _, c := range allBits { - actual := all(c.r) // all() adds the starBit, so compensate for that.. - if c.expected|starBit != actual { - t.Errorf("%d-%d/%d => expected %b, got %b", - c.r.min, c.r.max, 1, c.expected|starBit, actual) - } - } -} - -func TestBits(t *testing.T) { - bits := []struct { - min, max, step uint - expected uint64 - }{ - {0, 0, 1, 0x1}, - {1, 1, 1, 0x2}, - {1, 5, 2, 0x2a}, // 101010 - {1, 4, 2, 0xa}, // 1010 - } - - for _, c := range bits { - actual := getBits(c.min, c.max, c.step) - if c.expected != actual { - t.Errorf("%d-%d/%d => expected %b, got %b", - c.min, c.max, c.step, c.expected, actual) - } - } -} - -func TestParse(t *testing.T) { - entries := []struct { - expr string - expected Schedule - err string - }{ - { - expr: "* 5 * * * *", - expected: &SpecSchedule{ - Second: all(seconds), - Minute: 1 << 5, - Hour: all(hours), - Dom: all(dom), - Month: all(months), - Dow: all(dow), - }, - }, - { - expr: "* 5 j * * *", - err: "Failed to parse int from", - }, - { - expr: "@every 5m", - expected: ConstantDelaySchedule{Delay: time.Duration(5) * time.Minute}, - }, - { - expr: "@every Xm", - err: "Failed to parse duration", - }, - { - expr: "@yearly", - expected: &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: 1 << dom.min, - Month: 1 << months.min, - Dow: all(dow), - }, - }, - { - expr: "@annually", - expected: &SpecSchedule{ - Second: 1 << seconds.min, - Minute: 1 << minutes.min, - Hour: 1 << hours.min, - Dom: 1 << dom.min, - Month: 1 << months.min, - Dow: all(dow), - }, - }, - { - expr: "@unrecognized", - err: "Unrecognized descriptor", - }, - { - expr: "* * * *", - err: "Expected 5 to 6 fields", - }, - } - - for _, c := range entries { - actual, err := Parse(c.expr) - if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) { - t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) - } - if len(c.err) == 0 && err != nil { - t.Errorf("%s => unexpected error %v", c.expr, err) - } - if !reflect.DeepEqual(actual, c.expected) { - t.Errorf("%s => expected %v, got %v", c.expr, c.expected, actual) - } - } -} - -func TestStandardSpecSchedule(t *testing.T) { - entries := []struct { - expr string - expected Schedule - err string - }{ - { - expr: "5 * * * *", - expected: &SpecSchedule{1 << seconds.min, 1 << 5, all(hours), all(dom), all(months), all(dow)}, - }, - { - expr: "@every 5m", - expected: ConstantDelaySchedule{time.Duration(5) * time.Minute}, - }, - { - expr: "5 j * * *", - err: "Failed to parse int from", - }, - { - expr: "* * * *", - err: "Expected exactly 5 fields", - }, - } - - for _, c := range entries { - actual, err := ParseStandard(c.expr) - if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) { - t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) - } - if len(c.err) == 0 && err != nil { - t.Errorf("%s => unexpected error %v", c.expr, err) - } - if !reflect.DeepEqual(actual, c.expected) { - t.Errorf("%s => expected %v, got %v", c.expr, c.expected, actual) - } - } -} diff --git a/node/cron/spec.go b/node/cron/spec.go deleted file mode 100644 index aac9a60..0000000 --- a/node/cron/spec.go +++ /dev/null @@ -1,158 +0,0 @@ -package cron - -import "time" - -// SpecSchedule specifies a duty cycle (to the second granularity), based on a -// traditional crontab specification. It is computed initially and stored as bit sets. -type SpecSchedule struct { - Second, Minute, Hour, Dom, Month, Dow uint64 -} - -// bounds provides a range of acceptable values (plus a map of name to value). -type bounds struct { - min, max uint - names map[string]uint -} - -// The bounds for each field. -var ( - seconds = bounds{0, 59, nil} - minutes = bounds{0, 59, nil} - hours = bounds{0, 23, nil} - dom = bounds{1, 31, nil} - months = bounds{1, 12, map[string]uint{ - "jan": 1, - "feb": 2, - "mar": 3, - "apr": 4, - "may": 5, - "jun": 6, - "jul": 7, - "aug": 8, - "sep": 9, - "oct": 10, - "nov": 11, - "dec": 12, - }} - dow = bounds{0, 6, map[string]uint{ - "sun": 0, - "mon": 1, - "tue": 2, - "wed": 3, - "thu": 4, - "fri": 5, - "sat": 6, - }} -) - -const ( - // Set the top bit if a star was included in the expression. - starBit = 1 << 63 -) - -// Next returns the next time this schedule is activated, greater than the given -// time. If no time can be found to satisfy the schedule, return the zero time. -func (s *SpecSchedule) Next(t time.Time) time.Time { - // General approach: - // For Month, Day, Hour, Minute, Second: - // Check if the time value matches. If yes, continue to the next field. - // If the field doesn't match the schedule, then increment the field until it matches. - // While incrementing the field, a wrap-around brings it back to the beginning - // of the field list (since it is necessary to re-verify previous field - // values) - - // Start at the earliest possible time (the upcoming second). - t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond) - - // This flag indicates whether a field has been incremented. - added := false - - // If no time is found within five years, return zero. - yearLimit := t.Year() + 5 - -WRAP: - if t.Year() > yearLimit { - return time.Time{} - } - - // Find the first applicable month. - // If it's this month, then do nothing. - for 1< 0 - dowMatch bool = 1< 0 - ) - if s.Dom&starBit > 0 || s.Dow&starBit > 0 { - return domMatch && dowMatch - } - return domMatch || dowMatch -} diff --git a/node/cron/spec_test.go b/node/cron/spec_test.go deleted file mode 100644 index a1be616..0000000 --- a/node/cron/spec_test.go +++ /dev/null @@ -1,249 +0,0 @@ -package cron - -import ( - "testing" - "time" -) - -func TestActivation(t *testing.T) { - tests := []struct { - time, spec string - expected bool - }{ - // Every fifteen minutes. - {"Mon Jul 9 15:00 2012", "0 0/15 * * *", true}, - {"Mon Jul 9 15:45 2012", "0 0/15 * * *", true}, - {"Mon Jul 9 15:40 2012", "0 0/15 * * *", false}, - - // Every fifteen minutes, starting at 5 minutes. - {"Mon Jul 9 15:05 2012", "0 5/15 * * *", true}, - {"Mon Jul 9 15:20 2012", "0 5/15 * * *", true}, - {"Mon Jul 9 15:50 2012", "0 5/15 * * *", true}, - - // Named months - {"Sun Jul 15 15:00 2012", "0 0/15 * * Jul", true}, - {"Sun Jul 15 15:00 2012", "0 0/15 * * Jun", false}, - - // Everything set. - {"Sun Jul 15 08:30 2012", "0 30 08 ? Jul Sun", true}, - {"Sun Jul 15 08:30 2012", "0 30 08 15 Jul ?", true}, - {"Mon Jul 16 08:30 2012", "0 30 08 ? Jul Sun", false}, - {"Mon Jul 16 08:30 2012", "0 30 08 15 Jul ?", false}, - - // Predefined schedules - {"Mon Jul 9 15:00 2012", "@hourly", true}, - {"Mon Jul 9 15:04 2012", "@hourly", false}, - {"Mon Jul 9 15:00 2012", "@daily", false}, - {"Mon Jul 9 00:00 2012", "@daily", true}, - {"Mon Jul 9 00:00 2012", "@weekly", false}, - {"Sun Jul 8 00:00 2012", "@weekly", true}, - {"Sun Jul 8 01:00 2012", "@weekly", false}, - {"Sun Jul 8 00:00 2012", "@monthly", false}, - {"Sun Jul 1 00:00 2012", "@monthly", true}, - - // Test interaction of DOW and DOM. - // If both are specified, then only one needs to match. - {"Sun Jul 15 00:00 2012", "0 * * 1,15 * Sun", true}, - {"Fri Jun 15 00:00 2012", "0 * * 1,15 * Sun", true}, - {"Wed Aug 1 00:00 2012", "0 * * 1,15 * Sun", true}, - - // However, if one has a star, then both need to match. - {"Sun Jul 15 00:00 2012", "0 * * * * Mon", false}, - {"Sun Jul 15 00:00 2012", "0 * * */10 * Sun", false}, - {"Mon Jul 9 00:00 2012", "0 * * 1,15 * *", false}, - {"Sun Jul 15 00:00 2012", "0 * * 1,15 * *", true}, - {"Sun Jul 15 00:00 2012", "0 * * */2 * Sun", true}, - } - - for _, test := range tests { - sched, err := Parse(test.spec) - if err != nil { - t.Error(err) - continue - } - actual := sched.Next(getTime(test.time).Add(-1 * time.Second)) - expected := getTime(test.time) - if test.expected && expected != actual || !test.expected && expected == actual { - t.Errorf("Fail evaluating %s on %s: (expected) %s != %s (actual)", - test.spec, test.time, expected, actual) - } - } -} - -func TestNext(t *testing.T) { - runs := []struct { - time, spec string - expected string - }{ - // Simple cases - {"Mon Jul 9 14:45 2012", "0 0/15 * * *", "Mon Jul 9 15:00 2012"}, - {"Mon Jul 9 14:59 2012", "0 0/15 * * *", "Mon Jul 9 15:00 2012"}, - {"Mon Jul 9 14:59:59 2012", "0 0/15 * * *", "Mon Jul 9 15:00 2012"}, - - // Wrap around hours - {"Mon Jul 9 15:45 2012", "0 20-35/15 * * *", "Mon Jul 9 16:20 2012"}, - - // Wrap around days - {"Mon Jul 9 23:46 2012", "0 */15 * * *", "Tue Jul 10 00:00 2012"}, - {"Mon Jul 9 23:45 2012", "0 20-35/15 * * *", "Tue Jul 10 00:20 2012"}, - {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 * * *", "Tue Jul 10 00:20:15 2012"}, - {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 1/2 * *", "Tue Jul 10 01:20:15 2012"}, - {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 10-12 * *", "Tue Jul 10 10:20:15 2012"}, - - {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 1/2 */2 * *", "Thu Jul 11 01:20:15 2012"}, - {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 * 9-20 * *", "Wed Jul 10 00:20:15 2012"}, - {"Mon Jul 9 23:35:51 2012", "15/35 20-35/15 * 9-20 Jul *", "Wed Jul 10 00:20:15 2012"}, - - // Wrap around months - {"Mon Jul 9 23:35 2012", "0 0 0 9 Apr-Oct ?", "Thu Aug 9 00:00 2012"}, - {"Mon Jul 9 23:35 2012", "0 0 0 */5 Apr,Aug,Oct Mon", "Mon Aug 6 00:00 2012"}, - {"Mon Jul 9 23:35 2012", "0 0 0 */5 Oct Mon", "Mon Oct 1 00:00 2012"}, - - // Wrap around years - {"Mon Jul 9 23:35 2012", "0 0 0 * Feb Mon", "Mon Feb 4 00:00 2013"}, - {"Mon Jul 9 23:35 2012", "0 0 0 * Feb Mon/2", "Fri Feb 1 00:00 2013"}, - - // Wrap around minute, hour, day, month, and year - {"Mon Dec 31 23:59:45 2012", "0 * * * * *", "Tue Jan 1 00:00:00 2013"}, - - // Leap year - {"Mon Jul 9 23:35 2012", "0 0 0 29 Feb ?", "Mon Feb 29 00:00 2016"}, - - // Daylight savings time 2am EST (-5) -> 3am EDT (-4) - {"2012-03-11T00:00:00-0500", "0 30 2 11 Mar ?", "2013-03-11T02:30:00-0400"}, - - // hourly job - {"2012-03-11T00:00:00-0500", "0 0 * * * ?", "2012-03-11T01:00:00-0500"}, - {"2012-03-11T01:00:00-0500", "0 0 * * * ?", "2012-03-11T03:00:00-0400"}, - {"2012-03-11T03:00:00-0400", "0 0 * * * ?", "2012-03-11T04:00:00-0400"}, - {"2012-03-11T04:00:00-0400", "0 0 * * * ?", "2012-03-11T05:00:00-0400"}, - - // 1am nightly job - {"2012-03-11T00:00:00-0500", "0 0 1 * * ?", "2012-03-11T01:00:00-0500"}, - {"2012-03-11T01:00:00-0500", "0 0 1 * * ?", "2012-03-12T01:00:00-0400"}, - - // 2am nightly job (skipped) - {"2012-03-11T00:00:00-0500", "0 0 2 * * ?", "2012-03-12T02:00:00-0400"}, - - // Daylight savings time 2am EDT (-4) => 1am EST (-5) - {"2012-11-04T00:00:00-0400", "0 30 2 04 Nov ?", "2012-11-04T02:30:00-0500"}, - {"2012-11-04T01:45:00-0400", "0 30 1 04 Nov ?", "2012-11-04T01:30:00-0500"}, - - // hourly job - {"2012-11-04T00:00:00-0400", "0 0 * * * ?", "2012-11-04T01:00:00-0400"}, - {"2012-11-04T01:00:00-0400", "0 0 * * * ?", "2012-11-04T01:00:00-0500"}, - {"2012-11-04T01:00:00-0500", "0 0 * * * ?", "2012-11-04T02:00:00-0500"}, - - // 1am nightly job (runs twice) - {"2012-11-04T00:00:00-0400", "0 0 1 * * ?", "2012-11-04T01:00:00-0400"}, - {"2012-11-04T01:00:00-0400", "0 0 1 * * ?", "2012-11-04T01:00:00-0500"}, - {"2012-11-04T01:00:00-0500", "0 0 1 * * ?", "2012-11-05T01:00:00-0500"}, - - // 2am nightly job - {"2012-11-04T00:00:00-0400", "0 0 2 * * ?", "2012-11-04T02:00:00-0500"}, - {"2012-11-04T02:00:00-0500", "0 0 2 * * ?", "2012-11-05T02:00:00-0500"}, - - // 3am nightly job - {"2012-11-04T00:00:00-0400", "0 0 3 * * ?", "2012-11-04T03:00:00-0500"}, - {"2012-11-04T03:00:00-0500", "0 0 3 * * ?", "2012-11-05T03:00:00-0500"}, - - // Unsatisfiable - {"Mon Jul 9 23:35 2012", "0 0 0 30 Feb ?", ""}, - {"Mon Jul 9 23:35 2012", "0 0 0 31 Apr ?", ""}, - } - - for _, c := range runs { - sched, err := Parse(c.spec) - if err != nil { - t.Error(err) - continue - } - actual := sched.Next(getTime(c.time)) - expected := getTime(c.expected) - if !actual.Equal(expected) { - t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual) - } - } -} - -func TestErrors(t *testing.T) { - invalidSpecs := []string{ - "xyz", - "60 0 * * *", - "0 60 * * *", - "0 0 * * XYZ", - } - for _, spec := range invalidSpecs { - _, err := Parse(spec) - if err == nil { - t.Error("expected an error parsing: ", spec) - } - } -} - -func getTime(value string) time.Time { - if value == "" { - return time.Time{} - } - t, err := time.Parse("Mon Jan 2 15:04 2006", value) - if err != nil { - t, err = time.Parse("Mon Jan 2 15:04:05 2006", value) - if err != nil { - t, err = time.Parse("2006-01-02T15:04:05-0700", value) - if err != nil { - panic(err) - } - // Daylight savings time tests require location - if ny, err := time.LoadLocation("America/New_York"); err == nil { - t = t.In(ny) - } - } - } - - return t -} - -func TestNextWithTz(t *testing.T) { - runs := []struct { - time, spec string - expected string - }{ - // Failing tests - {"2016-01-03T13:09:03+0530", "0 14 14 * * *", "2016-01-03T14:14:00+0530"}, - {"2016-01-03T04:09:03+0530", "0 14 14 * * ?", "2016-01-03T14:14:00+0530"}, - - // Passing tests - {"2016-01-03T14:09:03+0530", "0 14 14 * * *", "2016-01-03T14:14:00+0530"}, - {"2016-01-03T14:00:00+0530", "0 14 14 * * ?", "2016-01-03T14:14:00+0530"}, - } - for _, c := range runs { - sched, err := Parse(c.spec) - if err != nil { - t.Error(err) - continue - } - actual := sched.Next(getTimeTZ(c.time)) - expected := getTimeTZ(c.expected) - if !actual.Equal(expected) { - t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual) - } - } -} - -func getTimeTZ(value string) time.Time { - if value == "" { - return time.Time{} - } - t, err := time.Parse("Mon Jan 2 15:04 2006", value) - if err != nil { - t, err = time.Parse("Mon Jan 2 15:04:05 2006", value) - if err != nil { - t, err = time.Parse("2006-01-02T15:04:05-0700", value) - if err != nil { - panic(err) - } - } - } - - return t -} diff --git a/node/group.go b/node/group.go index f24b7f6..3932c70 100644 --- a/node/group.go +++ b/node/group.go @@ -2,6 +2,7 @@ package node import ( "github.com/shunfei/cronsun" + "github.com/shunfei/cronsun/log" ) type Groups map[string]*cronsun.Group @@ -40,6 +41,7 @@ func (l link) add(gid, jid, rid, gname string) { } func (l link) addJob(job *cronsun.Job) { + log.Debugf("job:%+v", job) for _, r := range job.Rules { for _, gid := range r.GroupIDs { l.add(gid, job.ID, r.ID, job.Group) diff --git a/node/node.go b/node/node.go index 336f027..9c60cf7 100644 --- a/node/node.go +++ b/node/node.go @@ -12,10 +12,10 @@ import ( "time" client "github.com/coreos/etcd/clientv3" + cron "github.com/robfig/cron/v3" "github.com/shunfei/cronsun" "github.com/shunfei/cronsun/conf" "github.com/shunfei/cronsun/log" - "github.com/shunfei/cronsun/node/cron" "github.com/shunfei/cronsun/utils" ) @@ -182,11 +182,13 @@ func (n *Node) keepAlive() { func (n *Node) loadJobs() (err error) { log.Debugf("") + // 获取分组 if n.groups, err = cronsun.GetGroups(""); err != nil { return } log.Debugf("n.groups:%+v", n.groups) + // 获取所有job jobs, err := cronsun.GetJobs() if err != nil { return @@ -205,7 +207,8 @@ func (n *Node) loadJobs() (err error) { return } -func (n *Node) addJob(job *cronsun.Job, notice bool) { +// n.jobs和n.cmds的区别是什么? +func (n *Node) addJob(job *cronsun.Job, notice bool) error { log.Debugf("job:%+v, notice:%t", job, notice) n.link.addJob(job) @@ -218,13 +221,13 @@ func (n *Node) addJob(job *cronsun.Job, notice bool) { cmds := job.Cmds(n.ID, n.groups) log.Debugf("cmds:%+v", cmds) if len(cmds) == 0 { - return + return nil } for _, cmd := range cmds { n.addCmd(cmd, notice) } - return + return nil } func (n *Node) delJob(id string) { @@ -277,7 +280,8 @@ func (n *Node) modJob(job *cronsun.Job) { n.link.addJob(oJob) } -func (n *Node) addCmd(cmd *cronsun.Cmd, notice bool) { +// 将任务添加到cron中 +func (n *Node) addCmd(cmd *cronsun.Cmd, notice bool) error { log.Debugf("cmd:%+v, notice:%t", cmd, notice) n.Cron.Schedule(cmd.JobRule.Schedule, cmd) n.cmds[cmd.GetID()] = cmd @@ -285,7 +289,7 @@ func (n *Node) addCmd(cmd *cronsun.Cmd, notice bool) { if notice { log.Infof("job[%s] group[%s] rule[%s] timer[%s] has added", cmd.Job.ID, cmd.Job.Group, cmd.JobRule.ID, cmd.JobRule.Timer) } - return + return nil } func (n *Node) modCmd(cmd *cronsun.Cmd, notice bool) { @@ -311,16 +315,19 @@ func (n *Node) modCmd(cmd *cronsun.Cmd, notice bool) { } func (n *Node) delCmd(cmd *cronsun.Cmd) { + log.Debugf("cmd:%+v", cmd) delete(n.cmds, cmd.GetID()) n.Cron.DelJob(cmd) log.Infof("job[%s] group[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.Job.Group, cmd.JobRule.ID, cmd.JobRule.Timer) } func (n *Node) addGroup(g *cronsun.Group) { + log.Debugf("g:%+v", g) n.groups[g.ID] = g } func (n *Node) delGroup(id string) { + log.Debugf("id:%s", id) // delete job first defer n.link.delGroup(id) defer delete(n.groups, id)