Dynamic scheduler task in golang

Goal

Create a dynamic scheduler task in golang

Description

This recipe tells you how to use the scheduling abilities from golang to schedule a task to be processed, based on the number of records to be processed at a given time. The code in the recipe will make it possible for the scheduler to process more or less often based on a strategy that, in the example, depends on the number of records to be processed at a given time, as follows:

  • if the number of records to be processed is the same as the number of records asked for processing, then the minimum waiting time should be used
  • if the number of records to be processed is lower than the number of records asked for processing, the the medium waiting time should be used
  • otherwise, wait the maximum configured time

How to

Lets start with the mock service (service.go). This service has a method Do() which returns a slice of the records to be processed and an error, if something wrong happens. In its implementation, Do returns a different number of rows, depending on the number of its internal counter so that we can get different waiting times calculated:

package scheduler

import "fmt"

// Service .
type Service interface {
Do() ([]string, error)
RecordsToProcess() int
}

func New() Service {
return &svc{}
}

type svc struct {
counter int
}

func (s *svc) Do() ([]string, error) {
res := []string{"1", "2", "3", "4"}
s.counter = s.counter + 1

if s.counter%7 == 0 {
res = nil
}

if s.counter%3 == 0 {
res = []string{"1"}
}

fmt.Println("doing with:", res)

return res, nil
}

func (s *svc) RecordsToProcess() int {
return 2
}

For the Scheduler service (scheduling.go), we can see the scheduler struct as a wrapper around the service and a custom implementation of the Do() method that will schedule the next execution of the service, based on the number of returned rows.


import (
"fmt"
"time"
)

const (
schedulerMaxWaitTime = 10 * time.Second
schedulerMediumWaitTime = 5 * time.Second
schedulerMinWaitTime = 0 * time.Second
)

var (
schedulerFunction = time.AfterFunc
)

type scheduler struct {
Service
ID string
config ConfigScheduling
}

// ConfigScheduling configures times for Crons
type ConfigScheduling struct {
MaxWaitTime time.Duration
MediumWaitTime time.Duration
MinWaitTime time.Duration
}

func NewRescheduler(c ConfigScheduling, svc Service, id string) Service {

// if config duration is not defined it will use default time.
if c.MaxWaitTime == 0 {
c.MaxWaitTime = schedulerMaxWaitTime
}
if c.MediumWaitTime == 0 {
c.MediumWaitTime = schedulerMediumWaitTime
}
if c.MinWaitTime == 0 {
c.MinWaitTime = schedulerMinWaitTime
}

return scheduler{
config: c,
Service: svc,
ID: id,
}
}

func (s scheduler) Do() (processedRecords []string, errors error) {
processedRecords, errors = s.Service.Do()

schedulerFunction(s.nextDuration(processedRecords), func() {
s.Do()
})
return
}

func (s scheduler) nextDuration(processedRecords []string) time.Duration {
if len(processedRecords) == 0 {
fmt.Println("scheduling", s.ID, "to run in "+s.config.MaxWaitTime.String()+" from now")
return s.config.MaxWaitTime
} else if len(processedRecords) < s.RecordsToProcess() {

fmt.Println("scheduling", s.ID, "to run in "+s.config.MediumWaitTime.String()+" from now")
return s.config.MediumWaitTime
}

fmt.Println("scheduling", s.ID, "to run in "+s.config.MinWaitTime.String()+" from now")
return s.config.MinWaitTime
}

Finally, here’s the main.go file with the setup of the example code:

package main

import (
"fmt"
"os"
"os/signal"
"syscall"

"myrepo.com/scheduler"
)

func main() {
go schedule()

// graceful kill
errs := make(chan error, 1)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()

err := <-errs
if err != nil {
fmt.Println(err.Error())
}
}

func schedule() {
taskService := scheduler.New()
taskService = scheduler.NewRescheduler(scheduler.ConfigScheduling{}, taskService, "test")
_, _ = taskService.Do()
}

Explanation

No further explanation seems necessary.

One Reply to “”

Leave a comment