今回は、Goの非同期処理に親しむためにタスクをバッチ実行する簡単なスケジューラを作ってみました。
その時の内容をこの記事にまとめて、社内勉強会で紹介します。
使用している機能
- goroutine
- channel
- sync
バッチサーバの要件
- それぞれのタスクをgoroutineで並行処理する
- タスクを一定間隔で定期実行する
- 前のタスクが終わっていない場合は定期実行をスキップする
それではさっそく作っていきます
まずは下記のようにJob Schedulerを定義します。
type Scheduler struct {
jobs []Job
wg *sync.WaitGroup
}
type Job struct {
exit atomic.Uint32
name string
function func()
duration time.Duration
quit chan bool
mu sync.Mutex
}
Job構造体にはそれぞれ実行したいタスクをfunctionとして関数を指定できます。
次に、一定間隔でタスクを実行するためのメソッドを作成します。
func (j *Job) do(wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(j.duration)
for {
select {
case <-ticker.C:
if j.alive() {
j.mu.Lock()
j.function() // タスク実行
j.mu.Unlock()
}
case <-j.quit:
ticker.Stop()
return
}
}
}
selectは複数のチャネルを待ち受ける時に便利な機能です。
Tickerで定期的に信号を受け取り、タスクを実行しています。
Jobはそれぞれmutexを持っており、タスクを排他実行します。
mutexは排他ロックを簡単に実装できるものです。(https://pkg.go.dev/sync#Mutex)
これによって前のタスクがまだ実行中の場合は次のタスクをスキップします。
また、quit信号を受け取ったときはtickerを止めてWaitGroupを実行済みにします。
続いてSchedulerの実行と停止のメソッドを作成します。
func (s *Scheduler) Run() {
for i := 0; i < len(s.jobs); i++ {
s.wg.Add(1)
go s.jobs[i].do(s.wg) // goroutineで実行
}
}
func (s *Scheduler) Stop(done chan bool) {
for i := 0; i < len(s.jobs); i++ {
s.jobs[i].terminate()
s.jobs[i].quit <- true
}
s.wg.Wait()
done <- true
}
SchedulerはWaitGroupを持っており、Stopメソッドが呼ばれた時は
実行中のJobの終了を待ち合わせてから停止完了の信号をチャネルdoneへ送信します。
あとは便利のため、周辺の機能をメソッドに切り出して作成しておきます。
func NewScheduler(j []Job) *Scheduler {
var wg sync.WaitGroup
return &Scheduler{j, &wg}
}
func NewJob(name string, f func(), d time.Duration) Job {
return Job{
name: name,
duration: d,
function: f,
quit: make(chan bool),
}
}
// Jobの停止を登録
func (j *Job) terminate() {
j.exit.Store(1)
}
// Jobに停止信号が送られていないか確認
func (j *Job) alive() bool {
return j.exit.Load() == 0
}
Jobの持っているexitにはsyncパッケージのatomic.Uint32を使っています。
(https://pkg.go.dev/sync/atomic)
メモリへのアトミックな読み書きができるので、非同期な処理の際はこちらを使った方がいいかなと思います。
ただし、atomicのドキュメントにも記載がある通り、使用には細心の注意が必要です。
These functions require great care to be used correctly. Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package. Share memory by communicating; don’t communicate by sharing memory.
syncパッケージで事足りるのであればそちらを使うべきです。
今回はsyncパッケージのonce内でのatomicの使われ方を参考にしてみました。
https://cs.opensource.google/go/go/+/refs/tags/go1.21.0:src/sync/once.go
スケジューラの実装は以上です。
実行例は以下のようになります。
func main() {
jobs := []Job{
NewJob(
"JobA",
func() { fmt.Println("JobA") },
time.Second,
),
NewJob(
"JobB",
func() {
time.Sleep(3 * time.Second)
fmt.Println("JobB")
},
time.Second,
),
}
s := NewScheduler(jobs)
s.Run()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
done := make(chan bool, 1)
s.Stop(done)
<-done
fmt.Println("server exiting.")
}
上記の例では、それぞれのタスクは毎秒実行されますが、JobBは実行に3秒かかる想定です。
Jobの実行中は重複実行されませんので、このJobは3秒ごとに”JobB”を出力することになります。
バッチサーバはCtrl+Cなどのシグナルを受け取るまで実行状態になります。
停止シグナルを受け取るとSchedulerのStopメソッドを実行し、
実行中のタスクが全て完了するのを待ち合わせてから終了します。
まとめ
goroutineを使うことで、非常に簡単に並行処理の動作を実現できます。
ただし、スレッド間の情報のやり取り(channel)は実行されるタイミングをよく検討しないと
うまく動作しないので、こちらには慣れる必要があります。
参考
goroutineやchannelなど並行処理について
https://go.dev/tour/concurrency/1
syncパッケージ
https://pkg.go.dev/sync