并发编程(1)

Goroutine

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type c struct {
word string
wordId int
err error
place int
}

func main() {
cpunum := runtime.NumCPU()
runtime.GOMAXPROCS(cpunum)
ch := make(chan c, 100)
t1 := time.Now().Unix()
for i := 0; i < 300; i++ {
go test(i, ch)
}
for i := 0; i < 300; i++ {
select {
case res := <-ch:
fmt.Println(res)
}
}
t2 := time.Now().Unix()
log.Println(t2, t1)
}

func test(i int, ch chan c) {
time.Sleep(time.Second)
res := c{"test", i, nil, 0}
ch <- res
return
}

Channel

操作符<-

如果通过管道发送一个值,则将<-作为二元操作符使用。通过管道接收一个值,则将其作为一元操作符使用:

1
2
ic <- 3        // 往管道发送3
work := <-wc // 从管道接收一个指向Work类型值的指针

channel特点

  • Channel是Goroutine沟通的桥梁,大部分是阻塞同步的
  • 通过make创建,close关闭
  • channel是引用类型
  • 可以使用for range来迭代不断的操作
  • 可以设置单向或双向通道
  • 可以设置缓存大小,在未被填满之前不会发生阻塞

通道阻塞

对于无缓冲的channel,通道的发送/接收操作在对方准备好之前是阻塞

  • 对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的:如果ch中的数据无人接收,就无法再给通道传入其他数据:新的输入无法在通道非空的情况下传入。所以发送操作会等待 ch 再次变为可用状态:就是通道值被接收时(可以传入变量)。
  • 对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    package main

    import "fmt"

    func main() {
    ch1 := make(chan int)
    go pump(ch1) // pump hangs
    // 只是接受结果一次,也就是说,下面的send方再插入一个1,2就插入不进去了,阻塞了
    fmt.Println(<-ch1) // prints only 0
    }

    func pump(ch chan int) {
    for i := 0; ; i++ {
    ch <- i
    }
    }
    // 结果:0

常见错误示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
)

func f1(in chan int) {
fmt.Println(<-in)
}

func main() {
out := make(chan int)
out <- 2
go f1(out)
}

上述例子会照成死锁,原因很简单,一般通讯的话,有两种情况

  • 没有缓冲时,需要最少两个goroutine准备好
  • 有缓冲时,则是goroutine - 中间缓冲层 - goroutine之间的连接。

demo实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
// 使用make创建一个channel
channel := make(chan bool)
go func() {
fmt.Println("gogogo")
// 向channel存数据
channel <- true
// 关闭channel
close(channel)
}()
// 迭代channel
for v := range channel {
fmt.Println(v)
}
fmt.Println("this is base")
}
// 结果
➜ first go run helloworld.go
gogogo
true
this is base

由结果可以看出:先迭代完成才会进行下一步操作, 也就是说,迭代的过程是阻塞的,如果channel中没有值,迭代就会一直等待,一直等到它有值,
如果channel没有关闭,则会发生死锁(deadlock)

带缓冲的channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
ch := make(chan string, 4)
ch <- "zhangsan1"
ch <- "zhangsan2"
ch <- "zhangsan3"
ch <- "zhangsan4"
// 如果加上这一行,则会死锁,是因为当超过缓冲区的时候,就成阻塞模式了
//ch <- "zhangsan5"
close(ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
}
// 结果
➜ first go run helloworld.go
zhangsan1
zhangsan2
zhangsan3
zhangsan4
// 此处为string零值

可以看出带缓冲的channel略有不同。尽管已经close了,但我们依旧可以从中读出关闭前写入的3个值。第四次读取时,则会返回该channel类型的零值。向这类channel写入操作也会触发panic。

channel常与range配合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
ch := make(chan string)
go generate(ch)
for v := range ch {
fmt.Printf("%s\n", v)
}
}

func generate(ch chan string) {
ch <- "zhangsan1"
ch <- "zhangsan2"
ch <- "zhangsan3"
ch <- "zhangsan4"
ch <- "zhangsan5"
close(ch)
}

channel实际用途

可用作信号源

1
2
3
4
5
6
7
8
9
10
func main() {
fmt.Println("Begin doing something!")
c := make(chan bool)
go func() {
fmt.Println("Doing something…")
close(c)
}()
<-c
fmt.Println("Done!")
}

协同多个Goroutines

同上,close channel还可以用于协同多个Goroutines,比如下面这个例子,我们创建了100个Worker Goroutine,这些Goroutine在被创建出来后都阻塞在”<-start”上,直到我们在main goroutine中给出开工的信号:”close(start)”,这些goroutines才开始真正的并发运行起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
func worker(start chan bool, index int) {
<-start
fmt.Println("This is Worker:", index)
}

func main() {
start := make(chan bool)
for i := 1; i <= 100; i++ {
go worker(start, i)
}
close(start)
select {} //deadlock we expected
}

唯一的ID服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import "fmt"

func newUniqueIDService() <-chan string {
id := make(chan string)
go func() {
var counter int64 = 0
for {
// 无缓冲 -> 阻塞
id <- fmt.Sprintf("%x", counter)
counter += 1
}
}()
return id
}
func main() {
id := newUniqueIDService()
for i := 0; i < 10; i++ {
// 这边取一个,id插入一个(自增)
fmt.Println(<-id)
}
}

$ go run testuniqueid.go
0
1
2
3
4
5
6
7
8
9

select

select特点

  • 可以处理一个或多个channel的发送与接收
  • 同时有多个可用的channel时按随机顺序处理
  • 可用空的select阻塞main函数
  • 可设置超时

惯用法:for/select

我们在使用select时很少只是对其进行一次evaluation,我们常常将其与for {}结合在一起使用,并选择适当时机从for{}中退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for {
select {
case x := <- somechan:
// … 使用x进行一些操作

case y, ok := <- someOtherchan:
// … 使用y进行一些操作,
// 检查ok值判断someOtherchan是否已经关闭

case outputChan <- z:
// … z值被成功发送到Channel上时

default:
// … 上面case均无法通信时,执行此分支
}
}

终结workers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
ch, die := make(chan bool), make(chan bool)
for i := 0; i < 10; i++ {
go worker(die, i, ch)
ch <- true
}
time.Sleep(1 * time.Second)
// 此处终结worker
close(die)
}

func worker(die chan bool, index int, ch chan bool) {
fmt.Println("Begin: This is Worker:", index)
for {
select {
case <-ch:
fmt.Println("already worded:", index)
case <-die:
return
}
}
}

终结验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"fmt"
//"time"
)

func worker(die chan bool) {
fmt.Println("Begin: This is Worker")
for {
select {
//case xx:
//做事的分支
// get channel message
case <-die:
fmt.Println("Done: This is Worker")
// send channel message
die <- true
return
}
}
}

func main() {
die := make(chan bool)
go worker(die)
// send channel message
die <- true
// get channel message
<-die
fmt.Println("Worker goroutine has been terminated")
}
```

# 等待goroutine完成
使用`sync`包
```go
func correct() {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func(n int) { // 使用局部变量
fmt.Print(n)
wg.Done()
}(i)
}
wg.Wait()
fmt.Println()
}

上述只是等待goroutine完成,却没有进行通讯。有错误值也无法返回。所以说,最好还是使用channel
如果我们希望通讯,只需要在此基础上增加channel通讯即可
例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
people := []string{"Anna", "Bob", "Cody", "Dave", "Eva"}
match := make(chan string, 1) // 为一个未匹配的发送操作提供空间
wg := new(sync.WaitGroup)
wg.Add(len(people))
for _, name := range people {
go Seek(name, match, wg)
}
wg.Wait()
select {
case name := <-match:
fmt.Printf("No one received %s’s message.\n", name)
default:
// 没有待处理的发送操作
}
}

// 函数Seek 发送一个name到match管道或从match管道接收一个peer,结束时通知wait group
func Seek(name string, match chan string, wg *sync.WaitGroup) {
select {
case peer := <-match:
fmt.Printf("%s sent a message to %s.\n", peer, name)
case match <- name:
// 等待某个goroutine接收我的消息
}
wg.Done()
}

利用多核

1
2
3
4
func init() {
numcpu := runtime.NumCPU()
runtime.GOMAXPROCS(numcpu) // 尝试使用所有可用的CPU
}