Go’s concurrency model lets you build a worker pool where independent tasks are processed concurrently, but not too concurrently.
Let’s see this in action. Imagine you have a bunch of image resizing jobs to do. You don’t want to fire up a new goroutine for every single image, because that could overwhelm your system. Instead, you want a fixed number of "workers" that pick up jobs from a queue and process them.
Here’s a simple setup:
package main
import (
"fmt"
"sync"
"time"
)
// Job represents a task to be processed.
type Job struct {
ID int
// Other job data...
}
// Result represents the outcome of a processed job.
type Result struct {
JobID int
// Other result data...
}
// worker is a goroutine that processes jobs from a channel.
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d started job %d\n", id, job.ID)
// Simulate work
time.Sleep(time.Second)
fmt.Printf("Worker %d finished job %d\n", id, job.ID)
results <- Result{JobID: job.ID}
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// Start workers
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// Send jobs to the jobs channel
for j := 1; j <= numJobs; j++ {
jobs <- Job{ID: j}
}
close(jobs) // Signal that no more jobs will be sent
// Wait for all workers to finish
wg.Wait()
close(results) // Signal that no more results will be sent
// Collect results (optional, but good for seeing completion)
for r := range results {
fmt.Printf("Collected result for job %d\n", r.JobID)
}
fmt.Println("All jobs processed.")
}
In this code:
jobsis a buffered channel that acts as our job queue. Producers sendJobstructs to this channel.resultsis another buffered channel where workers send backResultstructs.workeris the function that each worker goroutine runs. It continuously receivesJobs from thejobschannel. When thejobschannel is closed and empty, therangeloop terminates, and the worker exits.sync.WaitGroupis used to ensure that themaingoroutine waits for all worker goroutines to complete before exiting.
The numWorkers constant is your primary lever. If you have 100 CPU cores and 1000 jobs, setting numWorkers to 1000 will likely be less efficient than setting it to, say, 8 or 16. Too many workers can lead to excessive context switching overhead, cache thrashing, and contention for shared resources. Too few, and you’re not utilizing available processing power. The optimal number often depends on the nature of the work (CPU-bound vs. I/O-bound) and the underlying hardware.
The buffering of the jobs channel is also critical. A buffer size of numJobs here means we can enqueue all jobs immediately without blocking the main goroutine, allowing it to start the workers and then proceed to wait for their completion. If the buffer was 0 (unbuffered), sending a job would block until a worker was ready to receive it.
The close(jobs) call is crucial. It signals to the range loop in the worker function that no more values will be sent, allowing the loop to terminate gracefully once all existing jobs have been processed. Similarly, closing the results channel after wg.Wait() signals that no more results are coming, allowing any goroutine ranging over results to finish.
The most surprising thing about Go’s concurrency primitives is how tightly coupled the chan and go keywords are, and how the range keyword on a channel abstracts away the polling loop that would otherwise be necessary. You’re not just sending data; you’re directly orchestrating execution flow.
When you run this, you’ll see workers picking up jobs concurrently, but you’ll only ever see a maximum of numWorkers (3 in this case) workers active at any given moment, even though there are 10 jobs.
Next, you’ll want to explore how to handle errors gracefully within your worker pool, perhaps by sending error results back on a separate channel or by using a context for cancellation.