Goal
Create a dynamic scheduler task in golang
Description
This recipe tells you how to use the scheduling abilities from golang to schedule a task to be processed, based on the number of records to be processed at a given time. The code in the recipe will make it possible for the scheduler to process more or less often based on a strategy that, in the example, depends on the number of records to be processed at a given time, as follows:
- if the number of records to be processed is the same as the number of records asked for processing, then the minimum waiting time should be used
- if the number of records to be processed is lower than the number of records asked for processing, the the medium waiting time should be used
- otherwise, wait the maximum configured time
How to
Lets start with the mock service (service.go). This service has a method Do()
which returns a slice of the records to be processed and an error, if something wrong happens. In its implementation, Do returns a different number of rows, depending on the number of its internal counter so that we can get different waiting times calculated:
package scheduler
import "fmt"
// Service .
type Service interface {
Do() ([]string, error)
RecordsToProcess() int
}
func New() Service {
return &svc{}
}
type svc struct {
counter int
}
func (s *svc) Do() ([]string, error) {
res := []string{"1", "2", "3", "4"}
s.counter = s.counter + 1
if s.counter%7 == 0 {
res = nil
}
if s.counter%3 == 0 {
res = []string{"1"}
}
fmt.Println("doing with:", res)
return res, nil
}
func (s *svc) RecordsToProcess() int {
return 2
}
For the Scheduler service (scheduling.go), we can see the scheduler struct as a wrapper around the service and a custom implementation of the Do()
method that will schedule the next execution of the service, based on the number of returned rows.
import (
"fmt"
"time"
)
const (
schedulerMaxWaitTime = 10 * time.Second
schedulerMediumWaitTime = 5 * time.Second
schedulerMinWaitTime = 0 * time.Second
)
var (
schedulerFunction = time.AfterFunc
)
type scheduler struct {
Service
ID string
config ConfigScheduling
}
// ConfigScheduling configures times for Crons
type ConfigScheduling struct {
MaxWaitTime time.Duration
MediumWaitTime time.Duration
MinWaitTime time.Duration
}
func NewRescheduler(c ConfigScheduling, svc Service, id string) Service {
// if config duration is not defined it will use default time.
if c.MaxWaitTime == 0 {
c.MaxWaitTime = schedulerMaxWaitTime
}
if c.MediumWaitTime == 0 {
c.MediumWaitTime = schedulerMediumWaitTime
}
if c.MinWaitTime == 0 {
c.MinWaitTime = schedulerMinWaitTime
}
return scheduler{
config: c,
Service: svc,
ID: id,
}
}
func (s scheduler) Do() (processedRecords []string, errors error) {
processedRecords, errors = s.Service.Do()
schedulerFunction(s.nextDuration(processedRecords), func() {
s.Do()
})
return
}
func (s scheduler) nextDuration(processedRecords []string) time.Duration {
if len(processedRecords) == 0 {
fmt.Println("scheduling", s.ID, "to run in "+s.config.MaxWaitTime.String()+" from now")
return s.config.MaxWaitTime
} else if len(processedRecords) < s.RecordsToProcess() {
fmt.Println("scheduling", s.ID, "to run in "+s.config.MediumWaitTime.String()+" from now")
return s.config.MediumWaitTime
}
fmt.Println("scheduling", s.ID, "to run in "+s.config.MinWaitTime.String()+" from now")
return s.config.MinWaitTime
}
Finally, here’s the main.go file with the setup of the example code:
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"myrepo.com/scheduler"
)
func main() {
go schedule()
// graceful kill
errs := make(chan error, 1)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
err := <-errs
if err != nil {
fmt.Println(err.Error())
}
}
func schedule() {
taskService := scheduler.New()
taskService = scheduler.NewRescheduler(scheduler.ConfigScheduling{}, taskService, "test")
_, _ = taskService.Do()
}
Explanation
No further explanation seems necessary.
One Reply to “”