diff --git a/cron.go b/cron.go index 6703643..bbef6ff 100644 --- a/cron.go +++ b/cron.go @@ -88,6 +88,17 @@ type Entry struct { // Valid returns true if this is not the zero entry. func (e Entry) Valid() bool { return e.ID != 0 } +// ScheduleFirst is used for the initial scheduling. If a Prev value has been +// included with the Entry, it will be used in place of "now" to allow schedules +// to be preserved across process restarts. +func (e Entry) ScheduleFirst(now time.Time) time.Time { + if !e.Prev.IsZero() { + return e.Schedule.Next(e.Prev) + } else { + return e.Schedule.Next(now) + } +} + // byTime is a wrapper for sorting the entry array by time // (with zero time at the end). type byTime []*Entry @@ -154,25 +165,25 @@ func (f FuncJob) Run() { f() } // AddFunc adds a func to the Cron to be run on the given schedule. // The spec is parsed using the time zone of this Cron instance as the default. // An opaque ID is returned that can be used to later remove it. -func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { - return c.AddJob(spec, FuncJob(cmd)) +func (c *Cron) AddFunc(spec string, cmd func(), entryOpts ...EntryOption) (EntryID, error) { + return c.AddJob(spec, FuncJob(cmd), entryOpts...) } // AddJob adds a Job to the Cron to be run on the given schedule. // The spec is parsed using the time zone of this Cron instance as the default. // An opaque ID is returned that can be used to later remove it. -func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) { +func (c *Cron) AddJob(spec string, cmd Job, entryOpts ...EntryOption) (EntryID, error) { schedule, err := c.parser.Parse(spec) if err != nil { return 0, err } - return c.Schedule(schedule, cmd), nil + return c.Schedule(schedule, cmd, entryOpts...), nil } // Schedule adds a Job to the Cron to be run on the given schedule. // The job is wrapped with the configured Chain. -func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { +func (c *Cron) Schedule(schedule Schedule, cmd Job, entryOpts ...EntryOption) EntryID { c.runningMu.Lock() defer c.runningMu.Unlock() c.nextID++ @@ -182,6 +193,9 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { WrappedJob: c.chain.Then(cmd), Job: cmd, } + for _, fn := range entryOpts { + fn(entry) + } if !c.running { c.entries = append(c.entries, entry) } else { @@ -223,6 +237,18 @@ func (c *Cron) UpdateSchedule(id EntryID, schedule Schedule) error { return errors.New(fmt.Sprintf("invalid ID provided: %d", id)) } +// EntryOption is a hook which allows the Entry to be altered before being +// committed internally. +type EntryOption func(*Entry) + +// EntryPrev allows setting the Prev time to allow interval-based schedules to +// preserve their timeline even in the face of process restarts. +func WithPrev(prev time.Time) EntryOption { + return func(e *Entry) { + e.Prev = prev + } +} + // Entries returns a snapshot of the cron entries. func (c *Cron) Entries() []Entry { c.runningMu.Lock() @@ -306,7 +332,7 @@ func (c *Cron) run() { // Figure out the next activation times for each entry. now := c.now() for _, entry := range c.entries { - entry.Next = entry.Schedule.Next(now) + entry.Next = entry.ScheduleFirst(now) c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next) } @@ -344,7 +370,7 @@ func (c *Cron) run() { case newEntry := <-c.add: timer.Stop() now = c.now() - newEntry.Next = newEntry.Schedule.Next(now) + newEntry.Next = newEntry.ScheduleFirst(now) c.entries = append(c.entries, newEntry) c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) diff --git a/cron_test.go b/cron_test.go index c690531..7b67190 100644 --- a/cron_test.go +++ b/cron_test.go @@ -679,6 +679,20 @@ func TestStopAndWait(t *testing.T) { }) } +func TestJobWithCustomPrev(t *testing.T) { + cron := New() + var calls int64 + // running every 3s, but starting 2s in the past + // expected timeline: 1s ... 4s ... stop (2 calls) + // if prev was ignored, the func would only be called once (at 3s) + cron.AddFunc("@every 3s", func() { atomic.AddInt64(&calls, 1) }, WithPrev(time.Now().Add(-2*time.Second))) + cron.Start() + time.Sleep(5 * time.Second) + if atomic.LoadInt64(&calls) != 2 { + t.Errorf("called %d times, expected 2\n", calls) + } +} + func TestMultiThreadedStartAndStop(t *testing.T) { cron := New() go cron.Run()