Golang has excellent support for concurrency. Let’s see how to implement a common pattern - worker pool in Golang.
Worker pool
A worker pool is a collection of threads that are waiting for tasks to be assigned. We can limit the number of concurrent operations at a time with this approach. Usually starting a new concurrent thread or routine is not practical for every task especially when the number of tasks is large.
Example
Let’s see an example of a worker pool. We will create a pool of workers and assign tasks to them.
package workerpool
import (
"fmt"
"sync"
"time"
)
// A task should have process method
type Task interface {
Process()
}
// WorkerPool is a collection of workers
type WorkerPool struct {
taskList []Task
workers int
taskQueue chan Task
wg sync.WaitGroup
}
// AddTask adds a task to the worker pool
func (wp *WorkerPool) AddTask(task Task) {
wp.taskList = append(wp.taskList, task)
}
// worker is a worker function that runs process method of a task
// it will be called as a go routine
func (wp *WorkerPool) work(id int) {
// this will wait for the task to be assigned
// task will be received from the taskQueue channel
for task := range wp.taskQueue {
task.Process()
}
// calls done after channel is closed
wp.wg.Done()
}
// Run starts the worker pool, creates workers and assigns tasks
func (wp *WorkerPool) Run() {
wp.taskQueue = make(chan Task, len(wp.TaskList))
// create workers
for i := 0; i < wp.workers; i++ {
go wp.work(i)
}
// wait config for all workers
wp.wg.Add(wp.workers)
// send tasks to workers via taskQueue channel
for _, task := range wp.TaskList {
wp.taskQueue <- task
}
// close the taskQueue channel after all tasks are assigned
close(wp.taskQueue)
// wait for all workers to finish
wp.wg.Wait()
}
We can implement a task by implementing the Task
interface. and these tasks can be assigned to the worker pool.
Example tasks:
type EmailTask struct {
To string
Subject string
Body string
}
func (e *EmailTask) Process() {
fmt.Printf("Sending email to %s\n", e.To)
time.Sleep(1 * time.Second)
}
type ApiCallTask struct {
Url string
}
func (a *ApiCallTask) Process() {
fmt.Printf("Calling API %s\n", a.Url)
time.Sleep(1 * time.Second)
}
Finally, let’s see a sample usage of the worker pool.
package main
import (
"worker-pattern/workerpool"
)
func main() {
wp := workerpool.WorkerPool{
TaskList: tasks,
workers: 3,
}
wp.AddTask(&EmailTask{})
wp.AddTask(&ApiCallTask{})
wp.AddTask(&EmailTask{})
wp.AddTask(&ApiCallTask{})
wp.AddTask(&EmailTask{})
wp.AddTask(&ApiCallTask{})
wp.AddTask(&EmailTask{})
wp.AddTask(&ApiCallTask{})
wp.AddTask(&EmailTask{})
wp.AddTask(&ApiCallTask{})
wp.AddTask(&EmailTask{})
wp.AddTask(&ApiCallTask{})
wp.Run()
}
In the above example, we have total 12 tasks and 3 workers. So, 3 workers will process 12 tasks in a controlled manner.
This pattern will be useful to limit the number of concurrent goroutines.