Go & Threads Introduction

One thing we might have noticed in the document[1] is early on it says if you need to read and understand this you’re being too clever, and we think that that’s good advice, so focus on how to write correct code, don’t focus way too much on the happens before relation and being able to reason about exactly why incorrect code is incorrect, like we don’t realy care we just want to be able to write correct code and call it a day.

Code Samples Time

package main

import "time"
import "sync"

var done bool
var mu sync.Mutex

func main() {
	time.Sleep(1 * time.Second)
	println("started")
	go periodic()
	time.Sleep(5 * time.Second) // wait for a while so we can observe what ticker does
	mu.Lock()
	done = true
	mu.Unlock()
	println("cancelled")
	time.Sleep(3 * time.Second) // observe no output
}

func periodic() {
	for {
		println("tick")
		time.Sleep(1 * time.Second)
		mu.Lock()
		if done {
			return
		}
		mu.Unlock()
	}
}

Question

Why do need locks, if it is ok just delete all the locks?

Answer

It you have accesses to shared variables, and you want to be able to observe them across different threads, you need to be holding a lock before you read or write those shared variables.

The compiler is allowed to dose something at low-level code a little bit different than what you intuitively thought would happen here. This is why go has the fancy memory model.

In this particular case, I think the go compiler would be allowed to optimize this to like lift the read of done outside the for, so read the shared variable once, and then if done is false, then set like make the inside be an infinite loop, because like now the way thread is written it had uses no synchronization primitives, there’s no mutex lock or unlock, no channel sends or receives, and so it’s actually not guaranteed to observe any mutations done by other concureently running threads.

So the memory model is pretty fancy and it’s really hard to think about why exactly incorrect programs are incorrect. But if you follow some general rules like whole blocks before you mutate shared variables then you can avoid thinking about some of nasty issues.

Code Sample Increase Atomically – Mutex

package main

import "sync"
import "time"

func main() {
	counter := 0
	var mu sync.Mutex
	for i := 0; i < 1000; i++ {
		go func() {
			mu.Lock()
			defer mu.Unlock()
			counter = counter + 1
		}()
	}

	time.Sleep(1 * time.Second)
	mu.Lock()
	println(counter)
	mu.Unlock()
}

Question

Can we remove the locks?

Answer

If remove the locks, we will miss some updates. Because these threads running concurrently, can read and write.

We want to ensure that this entire section here happens atomically, and so the way you make blocks of code run atomically are by using locks.

Code Sample Vote Count – Conditon Variable

package main

import "time"
import "math/rand"

func main() {
	rand.Seed(time.Now().UnixNano())

	count := 0
	finished := 0

	for i := 0; i < 10; i++ {
		go func() {
			vote := requestVote()
			if vote {
				count++
			}
			finished++
		}()
	}

	for count < 5 && finished != 10 {
		// wait
	}
	if count >= 5 {
		println("received 5+ votes!")
	} else {
		println("lost")
	}
}

func requestVote() bool {
	time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	return rand.Int() % 2 == 0
}

Question

What will happen in this code? How to optimize?

Answer

The shared varibale should be accessed in the section within locks.
The for loop is going to burn up 100% cpu, so to use the condition variable.

Code

package main

import "sync"
import "time"
import "math/rand"

func main() {
	rand.Seed(time.Now().UnixNano())

	count := 0
	finished := 0
	var mu sync.Mutex
	cond := sync.NewCond(&mu)

	for i := 0; i < 10; i++ {
		go func() {
			vote := requestVote()
			mu.Lock()
			defer mu.Unlock()
			if vote {
				count++
			}
			finished++
			cond.Broadcast()
		}()
	}

	mu.Lock()
	for count < 5 && finished != 10 {
		cond.Wait()
	}
	if count >= 5 {
		println("received 5+ votes!")
	} else {
		println("lost")
	}
	mu.Unlock()
}

func requestVote() bool {
	time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	return rand.Int() % 2 == 0
}

Code Sample Unbuffered Channel Deadlock

package main

func main() {
	c := make(chan bool)
	c <- true
	<-c
}

Question

What will happen?

Answer

This programme would stuck forever. Because the receives happens before the sends on unbuffered channel. If there is not the reveiver on the unbuffered channel, the sends will be blocked forever.

For the buffered channel, the sends happens before the receives on buffered channel. If there is not the sender on the buffered channel, the receives will be blocked forever.

Question

What do we often use channel for?

Answer

  • Producer consumer mode
  • To achieve something similar to what wait group do
package main

import "time"
import "math/rand"

func main() {
	c := make(chan int)

	for i := 0; i < 4; i++ {
		go doWork(c)
	}

	for {
		v := <-c
		println(v)
	}
}

func doWork(c chan int) {
	for {
		time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
		c <- rand.Int()
	}
}
package main

func main() {
	done := make(chan bool)
	for i := 0; i < 5; i++ {
		go func(x int) {
			sendRPC(x)
			done <- true
		}(i)
	}
	for i := 0; i < 5; i++ {
		<-done
	}
}

func sendRPC(i int) {
	println(i)
}

(46′ 未完待续)

Reference

[1]. The Go Memory Model. https://golang.org/ref/mem
[2]. Lecture 5: Go, Threads, and Raft. https://www.youtube.com/watch?v=UzzcUS2OHqo
[3]. Raft. https://web.stanford.edu/~ouster/cgi-bin/papers/raft-atc14

欢迎分享,引用。若转载,请联系我,谢谢配合。
本文地址:https://qoogle.top/mit-6-824-distributed-systemslecture-5-go-threads-and-raft/
最后修改日期:2020年7月31日

作者

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。