Concurrency is built-in to Go programming language. This sole feature of Go, makes it popular choice for many software systems with high request throughput. In this blog, we are gonna dig into some of the common concurrency patterns you may encounter. Before we go ahead, this blog assumes some basic knowledge of Go, and its support for channels and go-routines.
Note that this is just educational & requires lot of modifications when applied in real time.
Concurrency
In my words, concurrency is enabling software to perform independent computations helping us achieve a task much efficiently. This is not exactly same as running things in parallel. Concurrency takes advantage of processor’s ability to switch between multiple tasks scheduled on it. Each task may be waiting for I/O, Network, Disk etc. are efficiently switched between each other to achieve concurrency. Parallelism on the other hand, is actually executing multiple tasks on multiple threads available.
In Go, we achieve concurrency by creating go-routines and assign them to Go’s in built scheduler to run tasks concurrently. The details of how go runtime achieves concurrency is out of scope for this blog, but we’ll cover it in our future one’s :). Let’s not wait anymore, and jump into some common concurrency patterns in Go.
Common Patterns
Some of the very common concurrency patterns, you may need to implement in your software systems will fall into any of the below listed categories.
- Single Consumer Single Producer (SCSP)
- Multiple Consumer Multiple Producer (MCMP)
- Multiple Consumer Single Producer (MCSP)
- Single Consumer Multiple Producer (SCMP)
Now, let’s dig into each of these patterns with a nice example, to show case how’d they help in such scenarios.
Single Consumer Single Producer
Think of a printing queue, where a task is assigned to issue print commands with certain data to the printer task, and the printer task reads the data to print it out. In this example, the task can continue to issue print commands irrespective of the printer being able to print. The printer, can print whenever the related requirements(like, the hardware etc.) are met.
Now, let’s write it in Go:
package main
import "fmt"
func main() {
dataToPrint := make(chan string)
go func() {
for i := 0; i < 10; i++ {
dataToPrint <- fmt.Sprintf("data#%v", i)
}
close(dataToPrint)
}()
for data := range dataToPrint {
fmt.Println(data)
}
}
In the above code, we have a go-routine which produces the print events, and adds them to a local channel called dataToPrint. There’s a task running in the main go-routine which consumes the channel, and prints the data out, in this case to stdout.
Multiple Consumer Multiple Producer
Let’s consider an e-commerce application, a classic case, where multiple orders are placed into the queue, and there are multiple jobs scheduled to process those orders concurrently to increase efficiency.
package main
import (
"fmt"
"sync"
"time"
)
func producer(producerID int, wg *sync.WaitGroup, orders chan<- string) {
defer wg.Done()
for j := 0; j < 10; j++ {
orders <- fmt.Sprintf("producer process %v produced order %v\n", producerID, j)
time.Sleep(100 * time.Millisecond)
}
}
func consumer(consumerID int, wg *sync.WaitGroup, orders <-chan string) {
defer wg.Done()
for order := range orders {
fmt.Printf("consumer process %v read order with message: %v\n", consumerID, order)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
orders := make(chan string)
var producerWg sync.WaitGroup
var consumerWg sync.WaitGroup
for i := 0; i < 3; i++ {
producerWg.Add(1)
go producer(i, &producerWg, orders)
}
for i := 0; i < 3; i++ {
consumerWg.Add(1)
go consumer(i, &consumerWg, orders)
}
producerWg.Wait()
close(orders)
consumerWg.Wait()
}
In the above piece of code, we have a producer block which writes messages into an unbuffered channel (resulting in orderly execution of our messages, as producers are blocked until previous message is read from channel). We also have a consumer block which read the messages from channel & prints them out to stdout.
In our main function, we have created a waitGroup for producers, and a waitGroup for consumers. We increment the wait group using wg.Add(1) as we spin up three go-routines for producers, and three for consumers which does their job of producing & consuming messages concurrently. Inside each consumer we defer the execution of wg.Done(), which decrements the wait group.
The `wg.Wait()` call waits for the go-routines added to its wait group to be executed or the counter to be 0. It blocks the main go-routine until it is done. Once we’re done producing the messages, we close the channel, indicating the consumers that they can be done, once existing messages are read. The last statement waits for all the consumers to finish their job, and then let’s the main go-routine exit the program.
The above is an example of how we can communicate messages using channels with multiple producers and multiple consumers.
Single Consumer Multiple Producer (SCMP)
Consider a system where logs are required to be accumulated, and written to a destination from multiple routines running inside our main-routine. Let’s see how this looks like, in a Go program.
package main
import (
"fmt"
"sync"
"time"
)
func producer(producerID int, wg *sync.WaitGroup, logs chan<- string) {
defer wg.Done()
for j := 0; j < 10; j++ {
logs <- fmt.Sprintf("log line #%v from producer #%v", j, producerID)
time.Sleep(100 * time.Millisecond)
}
}
func consumer(logs <-chan string) {
for log := range logs {
fmt.Println(log)
}
}
func main() {
logs := make(chan string, 10)
var producerWg sync.WaitGroup
for i := 0; i < 3; i++ {
producerWg.Add(1)
go producer(i, &producerWg, logs)
}
go consumer(logs)
producerWg.Wait()
close(logs)
}
The above program is almost similar to MCMP, the only difference being there’s only a single go-routine which consumes all the log lines & the channel being buffered. The buffered channel allows the producers to write log lines into the channel, even though there’s no one consuming it until a specified capacity (in this case 10) is reached.
We have a consumer go-routine which reads those messages from channels, and then a wg.Wait() to let producers finish their job before we close the channel, which lets the consumer routine know about channel closure. You may notice that close statement doesn’t affect the termination of program, since there is no other blocking operation for the main go-routine to continue exiting the program. It will result in a deadlock, if there’s a waitGroup associated with consumer routine as well.
Multiple Consumer Single Producer (MCSP)
Consider a case, where stock prices are updated by a producer and there are multiple consumers reading that data and does their job accordingly. You must be able to write it by yourself now, with all the examples mentioned above. Here’s a simple version of it:
package main
import (
"fmt"
"sync"
"time"
)
func consumer(consumerID int, wg *sync.WaitGroup, prices <-chan int) {
defer wg.Done()
for price := range prices {
fmt.Printf("consumer %v received stock price update with price: %v\n", consumerID, price)
}
}
func producer(prices chan<- int) {
for i := 0; i < 10; i++ {
prices <- i
time.Sleep(100 * time.Millisecond)
}
close(prices)
}
func main() {
prices := make(chan int)
go producer(prices)
var consumerWg sync.WaitGroup
for i := 0; i < 3; i++ {
consumerWg.Add(1)
go consumer(i, &consumerWg, prices)
}
consumerWg.Wait()
}
You must have gotten the gist by now, a channel storing the messages as producer writes to it, and there are multiple consumers on wait group, to process as the data is received at their end.
Leaving you with something to think, why did we need waitGroups at all? Let me know your thoughts in the comments.
Conclusion
Producer & consumer patterns are quite common in today’s software systems, and I believe the above concurrency patterns will help you achieve them much efficiently. These patterns form the foundation of many concurrency patterns which you might encounter in complex software systems.
Stay tuned to our next blog, where I add on the significance of waitGroups and also try to explain how go-routines are handled by Go’s runtime. Until then, cheers!
