Skip to content

Commit 8e553c0

Browse files
authored
fix: remove jobs and respect stopTime in NextRuns when WithStopDateTime is set (#922)
1 parent 54f6bd8 commit 8e553c0

4 files changed

Lines changed: 127 additions & 1 deletion

File tree

job.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1635,7 +1635,11 @@ func (j job) NextRuns(count int) ([]time.Time, error) {
16351635
}
16361636

16371637
from := out[i-1]
1638-
out[i] = ij.next(from)
1638+
next := ij.next(from)
1639+
if !ij.stopTime.IsZero() && !next.Before(ij.stopTime) {
1640+
return out[:i], nil
1641+
}
1642+
out[i] = next
16391643
}
16401644

16411645
return out, nil

job_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,6 +1043,37 @@ func TestJob_NextRuns(t *testing.T) {
10431043
}
10441044
}
10451045

1046+
func TestJob_NextRuns_StopTime(t *testing.T) {
1047+
stopTime := time.Now().Add(350 * time.Millisecond)
1048+
1049+
s := newTestScheduler(t)
1050+
j, err := s.NewJob(
1051+
DurationJob(100*time.Millisecond),
1052+
NewTask(func() {}),
1053+
WithStopAt(WithStopDateTime(stopTime)),
1054+
)
1055+
require.NoError(t, err)
1056+
1057+
s.Start()
1058+
time.Sleep(50 * time.Millisecond)
1059+
1060+
// nextScheduled must not contain any time at or after stopTime
1061+
ij := requestJob(j.ID(), j.(*job).jobOutRequest)
1062+
require.NotNil(t, ij)
1063+
for _, ns := range ij.nextScheduled {
1064+
assert.True(t, ns.Before(stopTime), "nextScheduled contains time after stopTime: %v >= %v", ns, stopTime)
1065+
}
1066+
1067+
// NextRuns with a large count must all be before stopTime
1068+
runs, err := j.NextRuns(100)
1069+
require.NoError(t, err)
1070+
for _, r := range runs {
1071+
assert.True(t, r.Before(stopTime), "NextRuns returned time after stopTime: %v >= %v", r, stopTime)
1072+
}
1073+
1074+
require.NoError(t, s.Shutdown())
1075+
}
1076+
10461077
func TestJob_PanicOccurred(t *testing.T) {
10471078
gotCh := make(chan any)
10481079
errCh := make(chan error)

scheduler.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
363363
}
364364

365365
if j.stopTimeReached(s.now()) {
366+
s.selectRemoveJob(id)
366367
return
367368
}
368369

@@ -422,6 +423,11 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
422423
}
423424
}
424425

426+
if !j.stopTime.IsZero() && !next.Before(j.stopTime) {
427+
s.selectRemoveJob(id)
428+
return
429+
}
430+
425431
// Clean up any existing timer to prevent leaks
426432
if j.timer != nil {
427433
j.timer.Stop()
@@ -548,6 +554,13 @@ func (s *scheduler) selectNewJob(in newJobIn) {
548554
}
549555
}
550556

557+
if !j.stopTime.IsZero() && !next.Before(j.stopTime) {
558+
s.jobs[j.id] = j
559+
in.cancel()
560+
s.selectRemoveJob(j.id)
561+
return
562+
}
563+
551564
id := j.id
552565
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
553566
select {
@@ -609,6 +622,11 @@ func (s *scheduler) selectStart() {
609622
}
610623
}
611624

625+
if !j.stopTime.IsZero() && !next.Before(j.stopTime) {
626+
s.selectRemoveJob(id)
627+
continue
628+
}
629+
612630
jobID := id
613631
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
614632
select {

scheduler_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3190,3 +3190,76 @@ func TestScheduler_JobSchedule(t *testing.T) {
31903190
})
31913191
}
31923192
}
3193+
3194+
func TestScheduler_WithStopDateTime_JobRemovedAfterStopTime(t *testing.T) {
3195+
defer verifyNoGoroutineLeaks(t)
3196+
3197+
t.Run("job is removed from scheduler after stop time elapses", func(t *testing.T) {
3198+
monitor := newTestSchedulerMonitor()
3199+
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
3200+
3201+
_, err := s.NewJob(
3202+
DurationJob(50*time.Millisecond),
3203+
NewTask(func() {}),
3204+
WithStopAt(WithStopDateTime(time.Now().Add(200*time.Millisecond))),
3205+
WithStartAt(WithStartImmediately()),
3206+
)
3207+
require.NoError(t, err)
3208+
3209+
s.Start()
3210+
3211+
require.Eventually(t, func() bool {
3212+
return len(s.Jobs()) == 0
3213+
}, time.Second, 10*time.Millisecond, "job should be removed after stop time")
3214+
3215+
assert.GreaterOrEqual(t, monitor.getJobUnregCount(), int64(1), "monitor should receive JobUnregistered notification")
3216+
3217+
require.NoError(t, s.Shutdown())
3218+
})
3219+
3220+
t.Run("job added before start is removed on start when stop time already elapsed", func(t *testing.T) {
3221+
monitor := newTestSchedulerMonitor()
3222+
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
3223+
3224+
_, err := s.NewJob(
3225+
DurationJob(time.Hour),
3226+
NewTask(func() {}),
3227+
WithStopAt(WithStopDateTime(time.Now().Add(100*time.Millisecond))),
3228+
)
3229+
require.NoError(t, err)
3230+
3231+
// wait until stop time has passed, then start the scheduler
3232+
time.Sleep(150 * time.Millisecond)
3233+
s.Start()
3234+
3235+
require.Eventually(t, func() bool {
3236+
return len(s.Jobs()) == 0
3237+
}, time.Second, 10*time.Millisecond, "job should be removed when scheduler starts after stop time")
3238+
3239+
require.NoError(t, s.Shutdown())
3240+
})
3241+
3242+
t.Run("RemoveJob on already auto-removed job returns ErrJobNotFound", func(t *testing.T) {
3243+
s := newTestScheduler(t)
3244+
3245+
j, err := s.NewJob(
3246+
DurationJob(50*time.Millisecond),
3247+
NewTask(func() {}),
3248+
WithStopAt(WithStopDateTime(time.Now().Add(150*time.Millisecond))),
3249+
WithStartAt(WithStartImmediately()),
3250+
)
3251+
require.NoError(t, err)
3252+
3253+
s.Start()
3254+
3255+
require.Eventually(t, func() bool {
3256+
return len(s.Jobs()) == 0
3257+
}, time.Second, 10*time.Millisecond, "job should be auto-removed after stop time")
3258+
3259+
// Explicitly removing an already auto-removed job should return ErrJobNotFound
3260+
err = s.RemoveJob(j.ID())
3261+
assert.ErrorIs(t, err, ErrJobNotFound)
3262+
3263+
require.NoError(t, s.Shutdown())
3264+
})
3265+
}

0 commit comments

Comments
 (0)