Learning Concurrency in Go (Simulated Search Service)

What is concurrency?
According to Wikipedia, Concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order, without affecting the final outcome. So, what does this definition mean? Let’s say we have two processes, process A and process B, currently process A is being executed by the system, at the same time if process B arrives and it executed and again system returns to execute process A then process A and B are said to be concurrent processes.
Concurrency is the composition of independently executing computations.
func boring(msg string) {
for i := 0; i++ { fmt.Println(msg, i) time.Sleep(time.Second)
}}
Goroutines
It’s an independently executing function, launched by a go statement. It has its own call stack, which grows and shrinks as required. It’s very cheap. It’s practical to have thousands, even hundreds of thousands of goroutines. It’s not a thread. There might be only one thread in a program with thousands of goroutines. Instead, goroutines are multiplexed dynamically into threads as needed to keep all the goroutines running.
In the example, we started go routine printed a bit and exited. Goroutines were independently executing, they were not communicating or synchronizing their behavior in any way
To do proper concurrent you need to communicate
Communication
Our boring examples cheated: the main function couldn’t see the output from the other goroutine. It was just printed to the screen, where we pretended we saw a conversation. Real conversation requires communication.
Channels
A channel in Go provides a connection between two goroutines, allowing them to communicate.
//Declaring and initializing
var c chan int
c = make(chan int)
//or
c:= make(chan int)//sending on a channel
c <- 1// Receiving from a channel
// The "arrow" indicates the directio of data flow
value = <-c
Using channels
A channel connects the main and boring goroutines so they can communicate.
func main() {
c := make(chan string)
go boring("boring!", c)
for i :=0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c) //Receive expression is just a value }
fmt.Println("you're boring: I'm leaving")func boring(msg string, c chan string) {for i := 0; ; i++ { c <- fmt.Springf("%s %d", msg, i) //Expression to be sent can be any suitable value. time.Sleep(time.Duration(rand.Intn(le3)) * time.Millisecond)
}
}
Synchronization
When we read from a channel you have to wait for a value to be there, it’s a blocking operation.
When the main function executes <-c, it will wait for a value to be sent.
Similarly, when the boring function executes c <- value, it waits for a receiver to be ready.
A sender and receiver must both be ready to play their part in the communication. Otherwise we wait until they are.
Thus channels both communicate and synchronize in a single operation
The GO approach
Don’t communicate by sharing memory, share memory by communicating.
Generator: function that returns a channel
Channels are first-class values, just like strings or integers.
c := make(chan string)
go boring("boring!", c)
for i :=0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c) //Receive expression is just a value}
fmt.Println("you're boring: I'm leaving")func boring(msg string) <-c string { //returns receive-only channel of strings.
c := make(chan string)
go func(){ //we launch the goroutine from inside the function.
for i := 0; ; i++ { c <- fmt.Springf("%s %d", msg, i) //Expression to be sent can be any suitable value. time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
}()
return c // Return the channel to the caller.
}
Channels as a handle on a service
Our boring function returns a channel that lets us communicate with boring service it provides.
We can have more instances of the service.
func main() {joe := boring("Joe")
ann := boring("Ann")
for i := 0; ; i++ { fmt.Println(<-joe)
fmt.Println(<-ann)}
fmt.Println("you're boring: I'm leaving")}
Inside for loop we are reading value from joe and ann. Because of the synchronization nature of the channels, if ann is ready to send values but joe isn’t ann will still be blocked waiting the value to be delivered to main function. To get around this we can write a fanIn function or multiplexer.
Multiplexing
These programs make joe and ann count in lockstep. We can use a fan-in function to let whosoever is ready to talk
func fanIn(input1, input2 <-chan string) <-chan string { c := make(chan string)
go func() { for {c <- <-input1 } }()
go func() { for {c <- <-input1 } }()
return c}func main() { c := fanIn(boring("Joe"), boring("Ann"))
for i :=0; i < 5; i++ {
fmt.Pringln(<-c)
}
fmt.Println("you're boring: I'm leaving")
}
But what if we want them to be in lock-step or synchronous.
Restoring sequence
Send a channel on a channel, making goroutine wait its turn.
Receive all messages, then enable them again by sending a private channel.
First we define a message type that contains a channel for the reply.
type Message struct {
str string
wait chan bool
}
Each speaker must wait for a go-ahead.
for i :=0; i < 5; i++ {
msg1 := <-c; fmt.Println(msg1.str)
msg2 := <-c; fmt.Println(msg2.str)
msg1.wait <- true
msg2.wait <- true
}waitForIt := make(chan bool) //shared between all messages.c <- message{fmt.Sprintf("%s: %d", msg, i), waitForIt}
time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
<-waitForIt
Select
A control structure unique to concurrency.
The reason channels and goroutines are build into the language.
The select statement provides another way to handle multiple channels. It’s like a switch, but each case is a communication:
- All channels are evaluated.
- Selection blocks until one communication can proceed, which then does.
- If multiple can proceed, select chooses pseudo-randomly.
- A default clause, if present, execute immediately if no channel is ready
select {
case v1 := <-c1:
fmt.Printf("received %v from c1\n", v1)
case v2 := <-c2:
fmt.Printf("received %v from c2\n", v1)
case c3 <-23:
fmt.Printf("received %v from c3\n", 23)
case v1 := <-c1:
fmt.Printf("no one was ready to communicate")
}
If there is no default then, the select will block forever until the channel can proceed. If there is default, it means if nobody can proceed right away then execute the default.
Sometimes multiple channels are available at the same time, when that happens the select statement chooses one and shoot it randomly. So you can’t depend in the order in which the communication will proceed
Fan-in using select
Rewrite our original fanIn function. Only one goroutine is needed.
func fanIn(input1, input2 <-chan string) <-chan string {c := make(chan string)
go func() {
for {
select {
case s:= <-input1: c <- s
case s:= <-input2: c <- s
}
}()
return c}
The difference is the previous function started two goroutines, one for each input channel to be copied to the output. The function above starts single goroutine but instead of two copy loops it runs one copy loop, and selects which of the two is ready and passes the data appropriately. The both functions have exactly the same behaviour, execcpt that we are launching one go routine inside the fanin function
Timeout using select
Select can be used to timeout a communication. The time.After function returns a channel that blocks for the specified duration. After the interval, the channel delivers the current time, once.
func main() {
c := boring("joe")
for {
select {
case s := <-c:
fmt.println(s)
case <-time.After(1 * time.Second):
fmt.Println("You're too slow.")
return
}
}
}
Timeout for whole conversation using select
Create the timer once, outside the loop, to time out the entire conversation. (In the previous program, we had a timeout for each message.)
func main() {
c := boring("joe")
timeout := time.After(5 * time.Second)
for {
select {
case s := <-c :
fmt.Println(s)
case <-timeout:
fmt.Println("You talk too much.")
return
}
}
}
Quit channel
Instead of timing out the channel we can deterministically say, OK I am done stop now.
We can turn this around and tell joe to stop when we’re tired of listening to him.
quit := make(chan bool)
c := boring("joe", quit)
for i := rand.Intn(10); i >=0; i-- {fmt.Println(<-c) }
quit <- trueselect {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
return
}
What if after case <- quit: is executed, we need to do some clean up. After we get message to stop we might have some clean up functions to do.
Receive on quit channel
How do we know it’s finished? Wait for it to tell us it’s done: receive on the quit channel
quit := make(chan bool)
c := boring("joe", quit)
for i := rand.Intn(10); i >=0; i-- {fmt.Println(<-c) }
quit <- "Bye"
fmt.Printf("Joe says: %q\n", <-quit)select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
cleanup()
quit <- "See you!"
return
}
Example: Google Search Engine
What does Google search do? Given a query, return a page of search results (and some ads).
How do we get the search results? Send the query to Web search, Image search, YouTube, Maps, News, etc., then mix the results.
How do we implement this?
Bunch of independently acting backend that are searching results and finding it for you. How do we structure it?
Google Search: A fake framework
We can simulate the search function, much as we simulated conversation before.
var (
Web = fakeSearch("web")
Image = fakeSearch("image)
Video = fakeSearch("video")
)type Search func(query string) Resultfunc fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
Google Search 1.0
func Google(query string) (results []Result) {
results = append(results, Web(query))
results = append(results, Image(query))
results = append(results, Video(query))
return
}
Here func Google calls other three functions. The image search will only execute after web search is complete and video search will only execute after image search is complete. Why don’t we run them in goroutines
Google Search 2.0
Run the web, image and video searches concurrently, and wait for all results. No locks. No condition variables. No callbacks.
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } () for i := 0; i < 3; i++ {
result := <-c
results = append(results, result)
}
return
}
Now, we are waiting for the slowest server
Google Search 2.1
Sometimes service take a long time. They can be really really slow. Don’t wait for slow servers. Let’s say we don’t wanna wait for more than 80 milliseconds
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()timeout := time.After(80 * time.Millisecond)for i := 0; i < 3; i++ { select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
} }
return
How to avoid timeout
How do we avoid discarding results from slow servers? Replicate the servers. Send requests to multiple replicas, and use the first response.
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-cfunc main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := First("golang", fakeSearch(("replica 1"), fakeSearch("replica 2"))
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
If we run multiple instance services, one of them is likely to come out before the timeout expires. If one of them is having the problem then other one can be efficient.
Google Search 3.0
Reduce tail latency using replicated search servers.
c := make(chan Result)
go func() { c <- First(query, Web1, Web2) } ()
go func() { c <- First(query, Image1, Image2) } ()
go func() { c <- First(query, Video1, Video2) } ()timeout := time.After(80 * time.Millisecond)for i := 0; i < 3; i++ {select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}}
return
In just a few simple transformations we used GO’s concurrency primitives to convert a slow, sequential, failure-sensitive program into one that is fast, concurrent, replicated and robust.
Don’t overdo it
- They’re fun to play with, but don’t overuse these ideas.
- Goroutines and channels are big ideas. They’re tools for program construction. But sometimes all you need is a reference counter.
- Go has “sync” and “sync/atomic” packages that provide mutexes, condition variables, etc. They provide tools for smaller problems.
- Often, these things will work together to solve a bigger problem.
- Always use the right tool for the job.
Conclusions
Goroutines and channels make it easy to express complex operations dealing with
- multiple inputs
- multiple outputs
- timeouts
- failure
Thanks for reading! Hope this was helpful.