本文将简要介绍golang中的并发和线程间通信机制,主要围绕goroutine和channel展开。并通过两个demo说明二者的具体使用方法。通过channel我们可以实现lock free的数据结构,这在的第二个demo中会有所体现。

参考书目:《Go语言圣经》、《Go语言高级编程》

Golang并发

  如今,Web服务器每时每刻都会处理成千上万的请求,而Go语言通过其轻量级的并发单元——Goroutine可以实现非常强大的并发性能。Goroutine的创建成本极低、内存占用极少,同时,Goroutine运行在用户态下,所以Goroutine间的切换无需繁杂的上下文切换,这些特性都铸就了Golang适合高并发的特性。Goroutine通过Channel来进行相互之间的安全通信,这种通信方式体现了Golang的并发编程哲学:不要通过共享内存来通信,而应通过通信来共享内存。

  Go语言的并发体系理论是CSP(Communicating Sequential Process,通讯顺序进程),他是一种现代化的并发编程模型,在这种模型下,值会在各个运行实例(goroutine)中传递。多线程共享内存则是当今很多语言采用的更为传统的并发模型。

Goroutine

  在go语言中,每一个并发执行的单元叫做goroutine,其实我们只要编写过go语言程序就接触过goroutine,当我写下一个main函数时,编译运行后,就会有一个main函数运行在一个goroutine上。所以goroutine就相当于其它语言的线程,但又不同于线程,这一点在后面我会具体讨论。

  我们可以非常随意的启动一个goroutine,就像下面这样:

1
2
f()
go f()

这时就会有一个独立的goroutine去执行函数f,你可以在main这个goroutine中执行其它的操作。

接下来是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"time"
)

func main() {
go spinner(100 * time.Millisecond)
const n = 45
fibN := fib(n)
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}

func fib(x int) int {
if x < 2 {
return x
}
return fib(x-1) + fib(x-2)
}

func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}

这个Demo会在一小段动画后输出:

Fibonacci(45) = 1134903170

  这里我们在main这个goroutine内启动了另一个goroutine去执行spinner(),当main()输出最终结果并返回后,所有的过肉体呢都会被直接打断,程序退出。除了main()函数退出之外,没有其他的编程方法可以让一个goroutine打断另一个goroutine,除非通过channel,这个我们后面会提到。

Groutine与系统线程

  了解goroutine和系统线程的区别可以帮助我们更好的理解为什么go语言非常适合高并发。

动态栈

  每个OS线程的栈大小都是固定的,一般为2MB,这个栈会被用来存储当前正在被调用或者挂起的函数的内部变量,2MB的大小对一个很小的goroutine来说存在着很大的资源浪费,或许它完全用不到那么大的空间。对一个go语言程序来说,我们可能创建成百上千个goroutine,如果每个goroutine都需要那么大的栈空间的话,那么我们就没有办法创建很多的goroutine,从而go也不在真正适合高并发。同时,2MB是个固定的大小,如果我们需要在一个goroutine中来运行一个复杂的深层次的递归的话,这个栈空间大小就会显得捉襟见肘。所以一个可以动态调整大小的栈被应用在go语言中。

  每个goroutine在初始化后都会拥有一个很小的栈作为其生命周期的开始,一般为2KB。这个栈的用途同OS线程一样。但是,它的大小并不是固定的,而是可以动态的收缩,最大可以到1GB。动态大小的栈主要有两种方法来实现,一种方法是通过链栈,但是这种方法存在一定的性能问题,如链栈中的片段在内存中不一定是连续的,这会导致更多的缓存未命中;同时,我们需要为栈增加很多指针空间来将栈链接起来,这也是一种资源的浪费。go语言主要通过slice和内存管理来实现动态栈,类似于c++中的vector的内存分配方式,当栈的大小需要调整是,先检查栈后是否可以追加空间,如果可以则追加空间,如果不可,则重新进行更大空间的内存分配,并复制当前栈中的内容到新的空间。所以在这里有一个小小的陷阱,不要用其他语言的指针长时间的保存go语言的变量地址,因为随着栈空间的调整,这个地址是会发生变化的。

Goroutine调度

  OS线程会被操作系统内核调度,这个调度过程需要繁杂的上下文切换,也就是说操作系统首先需要从用户态转换为内核态,再保存当前线程的执行现场,同时将需要调度的线程的现场恢复,这几步操作非常慢,因为需要多次的内存访问。Go的运行时包含了其他的调度器,这个调度器采用m:n调度,也就是说,go的运行时会在n个OS线程上多工调度m个goroutine。这个调度器的调度过程与内核的调度时相似的,但是这个调度器只关注单独的go程序中的goroutine,它并不是一个硬件定时器,也就是说只有当这个goroutine阻塞时,调度器才会使其休眠,并调度执行另一个goroutine。同时,这种调度方法是不需要进行内核的上下文切换的,所以调度goroutine要比调度一个OS线程快的多。

  我们可以通过一个变量GOMAXPROCS来决定有多少个OS线程同时执行go语言代码,它的默认值是CPU和核心数量。我们可以在运行代码时显示的指定这个值,如:

1
$ GOMAXPROCS = 1 go run example.go

这样就只有一个操作系统线程在运行example.go。

Groutine中的循环陷阱

  如果了解过go匿名函数(闭包)的话,你应该不会对下面这个例子陌生。匿名函数中的循环变量存在非常典型的快照问题。如果我们写出下面的代码:

1
2
3
4
5
6
for _, i := range list {
go func() {
// do something
use(i) // NOTE: incorrect
}()
}

这里的循环变量是被所有的匿名函数值所共享的,并且会在每次的循环迭代中更新。当一个goroutine开始执行use(i)时,i的值已经被循环更新,它看到的值是更新后的值,而不是goroutine被创建时的那个原本的值。我们可以通过显示的添加参数来解决这个问题,如:

1
2
3
4
5
6
for _, i := range list {
go func(i int) {
// do something
use(i) // correct
}(i)
}

这时每个goroutine都会接收到初始化它时传入的那个值了。

Channel

  Goroutine是go语言的并发体,channel就是他们之间的通讯机制,他可以通过一个goroutine向另一个goroutine发送信息,channel的创建和使用方式和goroutine一样简单,具体操作如下:

1
2
3
4
5
6
ch := make(chan int)   		// A channel send and receive int
var x int
ch <- x // send
y := <-ch // receive
<-ch // receive
close(ch) // close a channel

上述例子创建了一个无缓存的channel,当然我们也可以创建带缓存的channel:

1
cd = make(chan int, 10)		// A channel with capacity 10

同时,channel也可以是当方向的,如:

1
2
<-chan int
chan<- int

前者是一个只能发送int类型的channel,而后者只能接收。

  向无缓存的channel发送数据会导致发送者的goroutine阻塞,知道另一个goroutine执行接收操作。反之,如果接收操作发生在前,那么接收者的goroutine会阻塞直至另一个goroutine在该管道上执行发送。这个操作就像是两个goroutine之间做了一次同步操作,所以无缓存的channel也被称为同步channel。

  带缓存的channel内部会持有一个元素队列,队列的最大容量可在make中指定,向缓存channel中发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队头删除。如果队列满,那么发送的goroutine阻塞,如果队列空,那么接收goroutine阻塞。

Channel配合goroutine的一些用法

  有关goroutine和channel的概念就介绍这么多,还有很多没有办法在一篇文章中写完,可以参见参考书目,有更加详细的介绍。接下来会介绍一些channel配合goroutine的用法,也是一些代码技巧。

缓存Channel——控制并发数量

  当我们感受到goroutine的强大的并发特性之后,或许我们会写出最大并发化的代码,也就是无限的创建goroutine。但大部分时候我们需要控制好并发的程度,因为过大的并发会导致CPU资源被一个进程占尽,所以我们需要为其他进程预留CPU资源。

  我们可以通过阻塞Channel来控制并发数量,具体做法如下:

1
2
3
4
5
6
7
8
9
10
11
12
var limit = make(chan struct{}, 3)

func main() {
for _, w := range work {
go func() {
limit <- struct{}{}
w()
<-limit
}()
}
select{}
}

通过这个名为limit的channel我们可以确保每时每刻最多只有3个goroutine处于运行状态。

同步Channel——并发的安全退出

  有时我们需要创建的goroutine集体退出,也就是说我们从一个goroutine中得到了想要的结果后就不需要其他的goroutine继续执行下去了。首先看看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func worker(cancel chan bool) {
for {
select {
default:
fmt.Println("hello")
// work
case <-cancel:
// quit
}
}
}

func main() {
cancel := make(chan bool)
for i := 0; i < 10; i++ {
go worker(cancel)
}

time.Sleep(time.Second)
close(cancel)
}

我们通过关闭一个无缓存的管道来向所有的goroutine发送终止信号,所有从关闭的管道中接收的操作都会收到一个零值或者一个可选的失败的标志。但是这段代码还是不够稳健,有时我们需要等待所有的goroutine关闭后在退出主程序,以防goroutine泄漏,我们可以通过sync.WaitGroup来执行这个操作,集体做法不在这里展开。

Channel——实现LockFree的数据结构

  通过Channel我们可以实现一些lock free的并发访问操作,比如函数缓存。所谓函数缓存就是某些函数的调用通常需要非常长的时间(数据库访问,URL请求等),同时这个调用操作又需要多次执行,且每次访问都返回相同的值,这时我们就可以将函数的返回值缓存入一个map,每次调用前先访问map,看能否找到缓存,如果找不到,再去调用费时的函数,这个技巧可以优化一些程序的执行时间。

  这里我们通过一个map来聊聊具体的做法,我们有一个需要并发访问的map,对这个map共有三个操作,insert、get和delete,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package thread_safe_map

import (
"fmt"
)

type message struct {
key interface{}
value interface{}
ok chan struct{}
err chan error
}

type MyMap struct {
mp map[interface{}]interface{}

getch chan message
delch chan message
insch chan message
}

func NewMyMap() *MyMap {
m := &MyMap{
mp: make(map[interface{}]interface{}),
getch: make(chan message, 100),
delch: make(chan message, 100),
insch: make(chan message, 100),
}
go m.schedule()
return m
}

func (m *MyMap) Get(key interface{}) (interface{}, error) {
msg := message{
key: key,
ok: make(chan struct{}),
err: make(chan error),
}
defer close(msg.ok)
defer close(msg.err)

m.getch <- msg

select {
case <-msg.ok:
return msg.value, nil

case err := <-msg.err:
return nil, err
}
}

func (m *MyMap) Insert(key interface{}, value interface{}) error {
msg := message{
key: key,
value: value,
ok: make(chan struct{}),
err: make(chan error),
}
defer close(msg.ok)
defer close(msg.err)
m.insch <- msg

select {
case <-msg.ok:
return nil

case err := <-msg.err:
return err
}
}

func (m *MyMap) Delete(key interface{}) (interface{}, error) {
msg := message{
key: key,
ok: make(chan struct{}),
err: make(chan error),
}
defer close(msg.ok)
defer close(msg.err)
m.delch <- msg

select {
case <-msg.ok:
return msg.value, nil

case err := <-msg.err:
return nil, err
}
}

func (m *MyMap) schedule() {
for {
select {
case msg := <-m.getch:
value, exists := m.mp[msg.key]
if !exists {
msg.err <- fmt.Errorf("key %v not found", msg.key)
} else {
msg.value = value
}
msg.ok <- struct{}{}

case msg := <-m.delch:
value, exists := m.mp[msg.key]
if !exists {
msg.err <- fmt.Errorf("key %v not found", msg.key)
} else {
delete(m.mp, msg.key)
msg.value = value
}
msg.ok <- struct{}{}

case msg := <-m.insch:
m.mp[msg.key] = msg.value
msg.ok <- struct{}{}
}
}
}

在这个Demo中,只有一个goroutine(schedule)对map进行访问,所以也就不会存在数据竞争,通过这种方法我们可以实现一个lock free的数据结构。

NOTE:上面这个Demo并不能直接用于生产环境中,他还是存在很大的性能问题,如通过一个goroutine来执行对map的操作回导致操作的串行化,此时这个map就是整个系统的性能瓶颈。虽然这种操作方式可以省去加锁和释放锁过程中的性能开销,但如果需要频繁的对这个map进行访问,哪这个map就会是整个系统的瓶颈,所以这里只是提供了一种通过管道来实现lock free的思路而不是对map的高性能实现。

如果你要实现一个高性能的map这里可以提供一个思路:1. 通过细粒度的锁来控制这个map,如分段加锁。 2. delete操作不真正的删除数据,而是只修改它的标记位为已删除。 3. 当然也可以采用读写锁

总结

  这篇博客到这里就结束了,其实我没有介绍太多关于goroutine和channel的基础知识,只是介绍了一些我觉得需要特别关注一下的地方和一些应用技巧,讲解并不全面,强大的goroutine和channel魔法是不可能通过一篇博客讲完的,需要我们不断的在应用中了解。所以最后,祝顺!