原文,这里为了方便理解我把worker pools翻译成线程池。

什么是缓冲区Channel

之前讨论的所有channel都是不带缓冲区的,因此读取和写入都会被阻塞。创建一个带缓冲区的channel也是可能的,这种channel只有在缓冲区满后再写入或者读取一个空的channel时才会被阻塞。

创建一个带缓冲区的channel需要一个额外的参数容量来表明缓冲区大小:

1
ch := make(chan type, capacity)

上面代码中的 capacity 需要大于0,如果等于0的话则是之前学习的无缓冲区channel。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import (
"fmt"
)


func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
fmt.Println(<- ch)
fmt.Println(<- ch)
}

上面的例子中,我们创建了一个容量为2的channel,所以在写入2个字符串之前的写操作不会被阻塞。然后分别在12、13行读取,程序输出如下:

1
2
naveen  
paul

另一个例子

我们再来看一个例子,我们在并发执行的goroutine中进行写操作,然后在main goroutine中读取,这个例子帮助我们更好的理解缓冲区channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"time"
)

func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("successfully wrote", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v,"from ch")
time.Sleep(2 * time.Second)

}
}

上面的代码,我们创建了一个容量是2的缓冲区channel,并把它作为参数传递给write函数,接下来sleep2秒钟。同时write函数并发的执行,在函数中使用for循环向ch写入0-4。由于容量是2,所以可以立即向channel中写入0和1,然后阻塞等待至少一个值被读取。所以程序会立即输出下面2行:

1
2
successfully wrote 0 to ch  
successfully wrote 1 to ch

当main函数中sleep2秒后,进入for range循环中开始读取数据,然后继续sleep2秒钟。所以程序接下来会输出:

1
2
read value 0 from ch  
successfully wrote 2 to ch

如此循环直到channel被关闭为止,程序最终输出结果如下:

1
2
3
4
5
6
7
8
9
10
successfully wrote 0 to ch  
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch

死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import (
"fmt"
)

func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
ch <- "steve"
fmt.Println(<-ch)
fmt.Println(<-ch)
}

上面的程序,我们想向容量为2的channel中写入3个字符串。程序执行到11行时候将会被阻塞,因为此时channel缓冲区已经满了。如果没有其他goroutine从中读取数据,程序将会死锁。报错如下:

1
2
3
4
5
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
/tmp/sandbox274756028/main.go:11 +0x100

长度和容量

容量是指一个有缓冲区的channel能够最多同时存储多少数据,这个值在使用make关键字用在创建channel时。而长度则是指当前channel中已经存放了多少个数据。我们看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
)

func main() {
ch := make(chan string, 3)
ch <- "naveen"
ch <- "paul"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}

上面的代码中我们创建了一个容量为3的channel,然后向里面写入2个字符串,因此现在channel的长度是2。接下来从channel中读取1个字符串,所以现在长度是1。程序输出如下:

1
2
3
4
capacity is 3  
length is 2
read value naveen
new length is 1

WaitGroup

下一节我们将要介绍线程池(worker pools),为了更好的理解,我们需要先介绍WaitGroup,然后我们基于这个实现线程池。

WaitGroup用来等待一组goroutine都执行完毕,在这之前程序都会被阻塞。假设我们有3个goroutine,主程序会等待这3个goroutine都执行结束才会退出。不多说看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"sync"
"time"
)

func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}

func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}

WaitGroup是一种struct类型,我们在18行创建了一个默认值的WaitGroup,其内部是基于计数器实现的。我们调用Add方法并传递给其一个数字作为参数,计数器将增长传入参数的值。当调用Done方法,计数器将自减1。Wait方法阻塞goroutine直到计数器归零。

上面的代码中通过在循环中调用wg.Add(1)来使计数器变成3,同时启动3个goroutine,然后掉用wg.Wait()阻塞主goroutine,直到计数器归零。在函数process中,调用wg.Done()来减小计数器,一旦三个goroutine执行结束,wg.Done()将被执行3次,计数器归零,主goroutine解除阻塞。

传递wg的地址给goroutine是非常重要的!如果传递的不是地址,那么每个goroutine都将有一份拷贝,这样的话每个goroutine结束就不能通知到main函数了。

程序输出如下:

1
2
3
4
5
6
7
started Goroutine  2  
started Goroutine 0
started Goroutine 1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing

你的输出结果可能和上面略有不同。

线程池(worker pools)

缓冲区channel一个重要的使用方法就是实现线程池。

通常来说,线程池就是一组线程的集合等待任务分配给他们,一旦完成任务,则继续等待下一个任务。

接下来我们实现一个线程池,来计算输入数字每一位的和。比如输入123,则返回9(1+2+3),输入给线程池的数字由伪随机算法生成。

下面是我们需要的核心步骤:

  • 创建一组goroutine集合监听缓冲区channel等待任务。
  • 向缓冲区channel添加任务。
  • 任务结束后向另一个缓冲区channel写入结果。
  • 从存储结果的channel读取数据并输出。

首先我们创建存储任务和结果的结构:

1
2
3
4
5
6
7
8
type Job struct {  
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}

每个Job都有一个id和一个randomno用来存储将要计算的随机数。而Result类型则包括Job属性和sumofdigits存储结果。

接下来创建缓冲区channel来接收任务和结果:

1
2
var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

goroutine从jobs中获取任务,并向results写入结果。

下面的digits函数用来计算求和并且返回结果,我们通过Sleep来模拟进行耗时的计算。

1
2
3
4
5
6
7
8
9
10
11
func digits(number int) int {  
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}

接下来的函数创建goroutine:

1
2
3
4
5
6
7
func worker(wg *sync.WaitGroup) {  
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}

通过读取jobs中的任务来创建Result结构,并存储函数digits计算后的结果,然后再将其写入results这个channel。这个函数接收一个WaitGroup类型的指针参数wg,并且在计算完成后调用wg.Done()

createWorkerPool这个函数用来创建线程池:

1
2
3
4
5
6
7
8
9
func createWorkerPool(noOfWorkers int) {  
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}

上面这个函数创建了包含noOfWorkers个goroutine的线程池,创建goroutine之前调用wg.Add(1)来增加计数器,然后将wg的地址传递给worker函数。创建完成后,使用wg.Wait()等待所有的goroutine执行完毕,然后又调用close函数关闭results这个channel,这样以后就没有任何goroutine能写入数据了。

接下来,我们来编写函数向线程池分配任务:

1
2
3
4
5
6
7
8
func allocate(noOfJobs int) {  
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}

上面这个函数通过传入的参数决定写入的任务数量,随机数最大值是998,并且使用循环中的计数器i作为ID来创建job结构并写入jobs,完成后关闭jobs

接下来创建函数读取results这个channel并且打印输出:

1
2
3
4
5
6
func result(done chan bool) {  
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}

上面这个函数读取results并且打印id、随机数和结果,最后向done这个channel写入数据表明其已经打印了全部的结果。

万事具备,让我们完成main函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {  
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

首先我们记录程序开始执行的时间,最后用结束时间减去开始时间计算程序运行时常,我们需要这个时常来比较不同数量的线程池的差异。

创建名为done的channel,并传递给result函数,这样就可以打印输出并且在完成全部输出后得到通知了。

最后创建了10个goroutine的线程池,并通过读取done来等待计算全部完成。

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

运行结果如下:

1
2
3
4
5
Job id 1, input random no 636, sum of digits 15  
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken 20.01081009 seconds

程序会有100行的输出,因为我们创建了100个job,你的输出顺序可能和我不同,并且时间也可能不一样,这取决于硬件配置。在我这总共用时20秒。

接下来提高noOfWorkers到20,我们提高了线程池中goroutine的数量(翻了一倍),运行时间绝对应该减少(接近一半)。在我的机器上,程序输出如下:

1
2
...
total time taken 10.004364685 seconds

这样我们可以明白了,线程池中goroutine增加会让程序运行时间减少。你可以随意调整main中的noOfJobsnoOfWorkers的值来分析结果。