Learning Concurrency in Go (Simulated Search Service)

Binod Kafle
9 min readOct 6, 2020

What is concurrency?

According to Wikipedia, Concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order, without affecting the final outcome. So, what does this definition mean? Let’s say we have two processes, process A and process B, currently process A is being executed by the system, at the same time if process B arrives and it executed and again system returns to execute process A then process A and B are said to be concurrent processes.

Concurrency is the composition of independently executing computations.

func boring(msg string) {
for i := 0; i++ {
fmt.Println(msg, i) time.Sleep(time.Second)
}
}

Goroutines

It’s an independently executing function, launched by a go statement. It has its own call stack, which grows and shrinks as required. It’s very cheap. It’s practical to have thousands, even hundreds of thousands of goroutines. It’s not a thread. There might be only one thread in a program with thousands of goroutines. Instead, goroutines are multiplexed dynamically into threads as needed to keep all the goroutines running.

In the example, we started go routine printed a bit and exited. Goroutines were independently executing, they were not communicating or synchronizing their behavior in any way

To do proper concurrent you need to communicate

Communication

Our boring examples cheated: the main function couldn’t see the output from the other goroutine. It was just printed to the screen, where we pretended we saw a conversation. Real conversation requires communication.

Channels

A channel in Go provides a connection between two goroutines, allowing them to communicate.

//Declaring and initializing
var c chan int
c = make(chan int)
//or
c:= make(chan int)
//sending on a channel
c <- 1
// Receiving from a channel
// The "arrow" indicates the directio of data flow
value = <-c

Using channels

A channel connects the main and boring goroutines so they can communicate.

func main() {
c := make(chan string)
go boring("boring!", c)
for i :=0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c) //Receive expression is just a value
}
fmt.Println("you're boring: I'm leaving")
func boring(msg string, c chan string) {for i := 0; ; i++ { c <- fmt.Springf("%s %d", msg, i) //Expression to be sent can be any suitable value. time.Sleep(time.Duration(rand.Intn(le3)) * time.Millisecond)
}
}

Synchronization

When we read from a channel you have to wait for a value to be there, it’s a blocking operation.

When the main function executes <-c, it will wait for a value to be sent.

Similarly, when the boring function executes c <- value, it waits for a receiver to be ready.

A sender and receiver must both be ready to play their part in the communication. Otherwise we wait until they are.

Thus channels both communicate and synchronize in a single operation

The GO approach

Don’t communicate by sharing memory, share memory by communicating.

Generator: function that returns a channel

Channels are first-class values, just like strings or integers.

c := make(chan string)
go boring("boring!", c)
for i :=0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c) //Receive expression is just a value
}
fmt.Println("you're boring: I'm leaving")
func boring(msg string) <-c string { //returns receive-only channel of strings.
c := make(chan string)
go func(){ //we launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Springf("%s %d", msg, i) //Expression to be sent can be any suitable value. time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
}()
return c // Return the channel to the caller.
}

Channels as a handle on a service

Our boring function returns a channel that lets us communicate with boring service it provides.

We can have more instances of the service.

func main() {joe := boring("Joe")
ann := boring("Ann")
for i := 0; ; i++ {
fmt.Println(<-joe)
fmt.Println(<-ann)
}
fmt.Println("you're boring: I'm leaving")
}

Inside for loop we are reading value from joe and ann. Because of the synchronization nature of the channels, if ann is ready to send values but joe isn’t ann will still be blocked waiting the value to be delivered to main function. To get around this we can write a fanIn function or multiplexer.

Multiplexing

These programs make joe and ann count in lockstep. We can use a fan-in function to let whosoever is ready to talk

func fanIn(input1, input2 <-chan string) <-chan string {  c := make(chan string)
go func() { for {c <- <-input1 } }()
go func() { for {c <- <-input1 } }()
return c
}func main() { c := fanIn(boring("Joe"), boring("Ann"))
for i :=0; i < 5; i++ {
fmt.Pringln(<-c)
}
fmt.Println("you're boring: I'm leaving")
}

But what if we want them to be in lock-step or synchronous.

Restoring sequence

Send a channel on a channel, making goroutine wait its turn.

Receive all messages, then enable them again by sending a private channel.

First we define a message type that contains a channel for the reply.

type Message struct {
str string
wait chan bool
}

Each speaker must wait for a go-ahead.

for i :=0; i < 5; i++ {
msg1 := <-c; fmt.Println(msg1.str)
msg2 := <-c; fmt.Println(msg2.str)
msg1.wait <- true
msg2.wait <- true
}
waitForIt := make(chan bool) //shared between all messages.c <- message{fmt.Sprintf("%s: %d", msg, i), waitForIt}
time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
<-waitForIt

Select

A control structure unique to concurrency.

The reason channels and goroutines are build into the language.

The select statement provides another way to handle multiple channels. It’s like a switch, but each case is a communication:

  • All channels are evaluated.
  • Selection blocks until one communication can proceed, which then does.
  • If multiple can proceed, select chooses pseudo-randomly.
  • A default clause, if present, execute immediately if no channel is ready
select {
case v1 := <-c1:
fmt.Printf("received %v from c1\n", v1)
case v2 := <-c2:
fmt.Printf("received %v from c2\n", v1)
case c3 <-23:
fmt.Printf("received %v from c3\n", 23)
case v1 := <-c1:
fmt.Printf("no one was ready to communicate")
}

If there is no default then, the select will block forever until the channel can proceed. If there is default, it means if nobody can proceed right away then execute the default.

Sometimes multiple channels are available at the same time, when that happens the select statement chooses one and shoot it randomly. So you can’t depend in the order in which the communication will proceed

Fan-in using select

Rewrite our original fanIn function. Only one goroutine is needed.

func fanIn(input1, input2 <-chan string) <-chan string {c := make(chan string)
go func() {
for {
select {
case s:= <-input1: c <- s
case s:= <-input2: c <- s
}
}()
return c
}

The difference is the previous function started two goroutines, one for each input channel to be copied to the output. The function above starts single goroutine but instead of two copy loops it runs one copy loop, and selects which of the two is ready and passes the data appropriately. The both functions have exactly the same behaviour, execcpt that we are launching one go routine inside the fanin function

Timeout using select

Select can be used to timeout a communication. The time.After function returns a channel that blocks for the specified duration. After the interval, the channel delivers the current time, once.

func main() {
c := boring("joe")
for {
select {
case s := <-c:
fmt.println(s)
case <-time.After(1 * time.Second):
fmt.Println("You're too slow.")
return
}
}
}

Timeout for whole conversation using select

Create the timer once, outside the loop, to time out the entire conversation. (In the previous program, we had a timeout for each message.)

func main() {
c := boring("joe")
timeout := time.After(5 * time.Second)
for {
select {
case s := <-c :
fmt.Println(s)
case <-timeout:
fmt.Println("You talk too much.")
return
}
}
}

Quit channel

Instead of timing out the channel we can deterministically say, OK I am done stop now.

We can turn this around and tell joe to stop when we’re tired of listening to him.

quit := make(chan bool)
c := boring("joe", quit)
for i := rand.Intn(10); i >=0; i-- {fmt.Println(<-c) }
quit <- true
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
return
}

What if after case <- quit: is executed, we need to do some clean up. After we get message to stop we might have some clean up functions to do.

Receive on quit channel

How do we know it’s finished? Wait for it to tell us it’s done: receive on the quit channel

quit := make(chan bool)
c := boring("joe", quit)
for i := rand.Intn(10); i >=0; i-- {fmt.Println(<-c) }
quit <- "Bye"
fmt.Printf("Joe says: %q\n", <-quit)
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
cleanup()
quit <- "See you!"
return
}

Example: Google Search Engine

What does Google search do? Given a query, return a page of search results (and some ads).

How do we get the search results? Send the query to Web search, Image search, YouTube, Maps, News, etc., then mix the results.

How do we implement this?

Bunch of independently acting backend that are searching results and finding it for you. How do we structure it?

Google Search: A fake framework

We can simulate the search function, much as we simulated conversation before.

var (
Web = fakeSearch("web")
Image = fakeSearch("image)
Video = fakeSearch("video")
)
type Search func(query string) Resultfunc fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}

Google Search 1.0

func Google(query string) (results []Result) {
results = append(results, Web(query))
results = append(results, Image(query))
results = append(results, Video(query))
return
}

Here func Google calls other three functions. The image search will only execute after web search is complete and video search will only execute after image search is complete. Why don’t we run them in goroutines

Google Search 2.0

Run the web, image and video searches concurrently, and wait for all results. No locks. No condition variables. No callbacks.

func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
for i := 0; i < 3; i++ {
result := <-c
results = append(results, result)
}
return
}

Now, we are waiting for the slowest server

Google Search 2.1

Sometimes service take a long time. They can be really really slow. Don’t wait for slow servers. Let’s say we don’t wanna wait for more than 80 milliseconds

c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
timeout := time.After(80 * time.Millisecond)for i := 0; i < 3; i++ { select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return

How to avoid timeout

How do we avoid discarding results from slow servers? Replicate the servers. Send requests to multiple replicas, and use the first response.

func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := First("golang", fakeSearch(("replica 1"), fakeSearch("replica 2"))
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}

If we run multiple instance services, one of them is likely to come out before the timeout expires. If one of them is having the problem then other one can be efficient.

Google Search 3.0

Reduce tail latency using replicated search servers.

c := make(chan Result)
go func() { c <- First(query, Web1, Web2) } ()
go func() { c <- First(query, Image1, Image2) } ()
go func() { c <- First(query, Video1, Video2) } ()
timeout := time.After(80 * time.Millisecond)for i := 0; i < 3; i++ {select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return

In just a few simple transformations we used GO’s concurrency primitives to convert a slow, sequential, failure-sensitive program into one that is fast, concurrent, replicated and robust.

Don’t overdo it

  • They’re fun to play with, but don’t overuse these ideas.
  • Goroutines and channels are big ideas. They’re tools for program construction. But sometimes all you need is a reference counter.
  • Go has “sync” and “sync/atomic” packages that provide mutexes, condition variables, etc. They provide tools for smaller problems.
  • Often, these things will work together to solve a bigger problem.
  • Always use the right tool for the job.

Conclusions

Goroutines and channels make it easy to express complex operations dealing with

  • multiple inputs
  • multiple outputs
  • timeouts
  • failure

Thanks for reading! Hope this was helpful.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

No responses yet

Write a response