13.goroutine和channel

Saturday, August 19, 2023

一次只做一件事并不总是完成任务最快的方法。goroutine可以让程序同时处理几个不同的任务,goroutine可以使用channel来协调它们的工作,channel允许goroutine互相发送数据并同步,这样一个goroutine就不会领先与另一个goroutine。

检索网页

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
)

func main() {
	responseSize("https://example.com/")
	responseSize("https://golang.org/")
	responseSize("https://golang.org/doc")
}

func responseSize(url string) { //将url作为参数,将获取页面的代码移动到单独的函数
	fmt.Println("Getting", url)    //打印正在检索的url
	response, err := http.Get(url) //获取给定的url
	if err != nil {
		log.Fatal(err)
	}
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(len(body)) //字符切片的大小与页面的大小相同

}

多任务

通过同时执行多个任务来找到加快程序运行速度的方式。

程序对responseSize进行几次调用,每次一个,对responseSize的每次调用都建立到网站的网络连接,等待网站响应,打印响应大小并返回,只有一个调用响应返回时,下一个调用才能开始。

如果有一种方式能同时运行所有三个responseSize调用,这个程序只需三分之一的时间即可完成。

使用goroutine的并发性

当responseSize调用http.Get时,程序必须在等待网站的响应,只能干等没有别的事情做。

另一个程序可能需要等待用户输入,再另一个可能需要等待数据从文件中读取,很多情况下,程序只能等待。

并发性允许程序暂停一个任务并处理其他任务:等待用户输入的程序可能在后台执行其他处理,读取文件时更新进度条,responseSize程序可能在等待第一个请求完成时发出其他网络请求。

并行:同时运行任务,一台只有一个处理器的计算机一次只能运行一个任务,但现代计算机都有多个处理器,计算机可能在不同的处理器之间分配并发任务,以便于同时运行。

goroutine允许并发:暂停一个任务来处理其他任务;在某些情况下,它们允许并行:同时处理多个任务!

在Go中,并发称为 goroutine ,其他语言有类似的概念,叫做线程,但是 goroutine 比线程需要更少的计算机内存,启动和停止时间更少,这意味着可以同时运行更多的goroutine。

启动一个goroutine,可以使用go语句,只是一个普通的函数或方法调用,前面有go关键字:

go myFunction()  //go关键字+函数调用
go otherFunction("arguments")

注意,我们说的是另一个goroutine,每个Go程序的main函数都是使用goroutine启动,因此每个Go程序至少运行一个goroutine。

使用goroutine

这里有个程序,一次调用一个函数,a函数使用循环打印字符串 a 50次,b函数打印字符串 b 50次,main函数调用a,然后调用b,最后在退出时打印一条消息。

package main

import "fmt"

func a() {
	for i := 0, i < 50; i++ {
		fmt.Print("a")
	}
}

func b() {
	for i := 0, i < 50; i++ {
		fmt.Print("b")
	}
}

func main() {
	a()
	b()
	fmt.Println("end main()")
}

就像main函数包含了a函数所有代码,然后是b函数所有代码,最后是自己的代码。要在新的goroutine中启动a和b函数,只需在函数调用前添加go关键字,这能让新的goroutine与main函数同时运行:

func main() {
	go a()
	go b()
	fmt.Println("end main()")
}

但是运行这个程序,将只能看到来自main函数末尾的Println调用的输出,不会看到来自a或b函数的任何内容。

Go程序在main goroutine(调用main函数的goroutine)结束后立即停止运行,即使其他goroutine仍在运行,main函数在a和b函数中的代码运行之前就完成了。

需要保持main goroutine运行,直到a和b函数goroutine完成,需要暂停main goroutine一段时间,这样其他goroutine就可以运行。

func main() {
	go a()
	go b()
	time.Sleep(time.Second)  //暂停main goroutine1秒
	fmt.Println("end main()")
}

当程序在两个goroutine之间切换时,两个输出将混合在一起,当main goroutine唤醒时,会调用fmt.Println并退出。

不能直接控制goroutine何时运行

每次运行程序时,可能会看到 goroutine 以不同的顺序运行,也无法知道上一个程序何时会在两个goroutine之间切换。

正常情况下,Go不能保证何时在goroutine之间切换,或者切换多长时间,这允许goroutine以最有效的方式运行,若需要运行顺序,那么需要使用channel来同步它们。

go语句不能使用返回值

切换到goroutine带来了另一个需要解决的问题:不能在go语句中使用函数返回值。假设想要改变responseSize来返回页面大小,而不是直接打印它:

func main() {
	var size int
	size = go responseSize("https://example.com/")  //此段代码无效
	fmt.Println(size)  //此段代码无效
	size = go responseSize("https://golang.org/")  //此段代码无效
	fmt.Println(size)  //此段代码无效
	size = go responseSize("https://golang.org/doc/")  //此段代码无效
	fmt.Println(size)  //此段代码无效
	time.Sleep(5 * time.Second)
}

func responseSize(url string) int {  //添加返回值
	fmt.Println("Getting", url)
	response, err := http.Get(url)
	if err != nil {
		log.Fatal(err)
	}
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		log.Fatal(err)
	}
	return len(body)  //返回响应大小,而不是打印它

}

会编译错误,编译器阻止尝试从使用go语句调用的函数中获取返回值。这实际上是好事,当将responseSize作为go语句的一部分调用时,会说“在单独的goroutine中运行responseSize,将一直运行此函数中的指令。” responseSize函数不会立即返回值,必须等待网页响应,但是 main goroutine 中的代码会立即期望一个返回值,但目前还没有返回值。

size = go responseSize("https://example.com/")  //去运行这个,不会再等了
fmt.Println(size)  //那么返回什么呢

在go语句中调用的任何函数都是这样,而不仅仅是像responseSize这样的长时间运行函数,不能指望返回值会及时准备好,因此Go编译器会阻止任何使用它们的尝试。

Go不允许使用go语句调用的函数的返回值,因为在尝试使用它之前,不能保证返回值已经准备好。

channel

但是goroutine之间有一种交流方式:channel,channel不仅允许将值从一个goroutine发送到另一个goroutine,还确保在接收的goroutine尝试使用该值之前,发送的goroutine已经发送了该值。

使用channel的唯一方法是从一个goroutine到另一个goroutine的通信。

为了演示channel,需要:

  • 创建一个channel
  • 编写一个函数,该函数接收一个channel作为参数,我们将在一个单独的goroutine中运行这个函数,并使用它通过channel发送值。
  • 在初始的goroutine中接收发送的值

每个channel只携带特定类型的值,因此可能有一个channle用于int值,另一个channel用于struct类型的值,要声明包含channel的变量,可以使用chan关键字,然后是channel将携带的值的类型。

var myChannel chan float64

要实际创建channel,需要调用内置 make 函数(与创建映射和切片的函数相同)。传递make要创建的channel的类型(应该与要赋值给它的变量的类型相同)。

var myChannel chan float64  //声明一个变量来保存channel
myChannel =  make(chan float64)  //实际创建channel

不是单独声明channel变量,在大多数情况下,使用一个短变量声明更容易:

myChannel := make(chan float64)  //创建一个channel并立即声明一个变量

使用channel发送和接收值

要在channel上发送值,使用<-运算符,从发送的值指向发送该值的channel。

myChannel <- 3.14   //向名为 "myChannel" 的channel发送一个值"3.14"

还可以使用 <- 接收来自channel的值,但是位置不同:将箭头放在接收channel的左侧,看起来像从channel中取出一个值。

<-myChannel

例:
向greeting添加一个myChannel参数,接受一个包含字符串值的channel,greeting现在不是返回一个字符串值,而是通过myChannel发送一个字符串。

在main函数中,使用内置的make函数创建要传递给greeting的channel,然后调用greeting来作为一个新的goroutine,使用单独的goroutine很重要,因为channel应该只用于goroutine之间的通信。然后从传递给greeting的channel中接收一个值,并打印返回字符串。

func greeting(myChannel chan string) { //将channel作为参数
	myChannel <- "hi"  //通过channel发送给一个值
}

func main() {
	myChannel := make(chan string)  //创建一个新的channel
	go greeting(myChannel)  //将channel传递给在新goroutine中运行的函数
	fmt.Println(<-myChannel)  //从channel接收值
}

不必将从channel接收的值直接传递给Println,可以在任何需要值的上下文中从channel接收(在任何可以使用变量或函数的返回值的地方)。比如可以先将接收到的值赋给一个变量:

receiveValue := <-myChannel  //也可以将接收的值存储在一个变量中
fmt.Println(receiveValue)

同步goroutine与channel

channel确保发送的goroutine在接收channel尝试使用该值前已经发送了该值,channel通过blocking(阻塞)—暂停当前goroutine中的所有进一步操作来实现。发送操作阻塞发送goroutine,直到另一个goroutine在同一cahnnel上执行了接收操作,反之亦然,接收操作阻塞接收goroutine,直到另一个goroutine在同一channel上执行了发送操作,这个行为允许goroutine同步它们的动作,协调它们的时间。

例:
创建两个channel并将它们传递给两个新goroutine中的函数,然后main goroutine从这些channel接收值并打印它们。与goroutine反复打印“a”或“b”的程序不同,可以预测这个程序的输出:总是按顺序打印“a”,然后打印“d” “b” “e” “c” “f”。

func abc(channel chan string) {
	channel <- "a"
	channel <- "b"
	channel <- "c"
}

func def(channel chan string) {
	channel <- "d"
	channel <- "e"
	channel <- "f"
}

func main() {
	channel1 := make(chan string)  //创建channel
	channel2 := make(chan string)  //创建channel
	go abc(channel1)  //将channel传递给新goroutine中运行的函数
	go def(channel2)  //将channel传递给新goroutine中运行的函数
	fmt.Print(<-channel1)  //从channel接收和打印值
	fmt.Print(<-channel2)
	fmt.Print(<-channel1)
	fmt.Print(<-channel2)
	fmt.Print(<-channel1)
	fmt.Print(<-channel2)
	fmt.Println()
}

输出:

adbecf

abc goroutine每次向channel发送一个值时都会阻塞,直到main goroutine接收到它为止,def goroutine也是如此。main goroutine称为abc goroutine和def goroutine的协调器,只有当它准备读取它们发送的值时,才允许它们继续。

abc 和 def 函数都在单独的goroutine并发启动,abc 函数尝试将 “a” 发送到 channel1,由于channel1是无缓冲的,abc 会阻塞,直到main goroutine 从该通道上接收数据。main goroutine 使用 fmt.Print(<-channel1) 接收并打印“a”,此时 abc 函数中的阻塞解除,它会尝试将“b”发送到channel1,但同样会阻塞,因为需要一个接收操作。def 函数与 abc 函数类似,它尝试将“d”发送到 channel2,并在main goroutine接收并打印“d”之后继续执行,以此类推,main goroutine会交替从两个通道接收并打印数据,这就是为什么顺序是 “adbecf”。

观察goroutine同步

abc goroutine 和 def goroutine 通过它们的channel发送它们的值的速度很快,所以看不到发生了什么,下边这个程序可以减慢速度,方便观察阻塞:

package main

import (
	"fmt"
	"time"
)

func reportNap(name string, delay int) { //休眠goroutine名字、时间
	for i := 0; i < delay; i++ {
		fmt.Println(name, "sleeping")
		time.Sleep(1 * time.Second)
	}
	fmt.Println(name, "wakes up!")
}

func send(myChannel chan string) {
	reportNap("sending goroutine", 2)
	fmt.Println("***sending value***")
	myChannel <- "a" //在main 仍处于休眠状态时阻塞此发送
	fmt.Println("***sending value***")
	myChannel <- "b"
}

func main() {
	myChannel := make(chan string)
	go send(myChannel)
	reportNap("receiving goroutine", 5)
	fmt.Println(<-myChannel)
	fmt.Println(<-myChannel)
}

从reportNap函数开始,该函数当前goroutine休眠指定的秒数,goroutine休眠时,每一秒会打印一个通知,说它在休眠。

添加一个send函数,将在goroutine中运行,并将两个值发送到一个channel,在发送任何东西之前,首先调用reportNap,这样它的goroutine会休眠2秒。

在main goroutine中,创建一个channel并将其传递给send,然后再次调用reportNap,使此goroutine休眠5秒(比send goroutine长3秒),最后在channel上执行两个接收操作。

当运行这个程序时,会看到两个goroutine在前2秒都处于休眠状态,然后send goroutine醒来并发送它的值,但它没有做任何进一步的事情,发送操作阻塞了send goroutine,直到main goroutine接收到该值。

这不会马上发生,因为main goroutine仍然需要再休眠3秒,当它醒来时,从channel接收值,此时,send goroutine才能解除阻塞,才能发送第二个值。

输出:

receiving goroutine sleeping  //发送和接收goroutine都休眠了
sending goroutine sleeping  //发送和接收goroutine都休眠了
receiving goroutine sleeping  //发送和接收goroutine都休眠了
sending goroutine sleeping  //发送和接收goroutine都休眠了
sending goroutine wakes up!  //发送goroutine醒来,并发送一个值
***sending value***  //发送goroutine醒来,并发送一个值
receiving goroutine sleeping  //接收goroutine还在休眠
receiving goroutine sleeping  //接收goroutine还在休眠
receiving goroutine sleeping
receiving goroutine wakes up!  //接收goroutine醒来,并接收一个值
a  //接收goroutine醒来,并接收一个值
***sending value***  //此时,发送goroutine才能解除阻塞,然后发送第二个值
b

下面是之前的演示代码,进行如下的更改:

func greeting(myChannel chan string) {
	myChannel <- "hi"
}
func main() {
	myChannel := make(chan string)
	go greeting(myChannel)
	fmt.Println(<-myChannel)
}
如果这样 会失败,因为
myChannel <- “hi from main”
从main函数中发送一个值到channel
会得到一个“all goroutine are sleep-deadlock!”的错误,因为main goroutine阻塞了,还在等待另一个goroutine从channel接收,但是另一个goroutine没有做任何接收操作,所以main goroutine保持阻塞状态
go greetine(myChannel)
在调用greeting之前删除go关键字
这将导致greeting函数在main goroutine中运行,也会因为死锁错误而失败,原因与上面一样:greeting中的发送操作导致main goroutine阻塞,但是没有其他goroutine执行接收操作,所以会保持阻塞状态
myChannel <- “hi”
删除向channel发送值的行
这也会导致死锁,原因是main goroutine试图接收一个值,但现在没有任何东西可以发送值
fmt.Println(<-myChannel)
删除从channel接收值的行
greeting中的发送操作会导致goroutine阻塞,但是由于没有接收操作使main goroutine阻塞,所以main立即完成,程序结束时不产生任何输出

使用channel修复程序

package main

import ( //不使用time.Sleep,删除time包
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
)

func responseSize(url string, channel chan int) { //将向responseSize传递一个channel,以便发送页面大小
	fmt.Println("Getting", url)
	response, err := http.Get(url)
	if err != nil {
		log.Fatal(err)
	}
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		log.Fatal()
	}
	channel <- len(body) //不返回页面大小,通过channel发送
}

func main() {
	sizes := make(chan int)                        //创建一个int值channel
	go responseSize("https://example.com/", sizes) //每次调用responseSize时都将channel传递过去
	go responseSize("https://golang.org/", sizes)
	go responseSize("https://golang.org/doc", sizes)
	fmt.Println(<-sizes) //channel上将有三个发送,所以要做三个接收
	fmt.Println(<-sizes)
	fmt.Println(<-sizes)
}

在main函数中,调用make创建int值的channel,更新对responseSize的每个调用,来添加channel作为参数,最后,在channel上执行三个接收操作,每个对应一个responseSize发送的值。

运行这个程序,会看到程序的完成速度与网站响应速度一样快。

接下来继续优化,为使代码更干净,可以将要检索的url列表存储在一个切片中,然后使用循环来调用responseSize,并从channel接收值。

responseSize无需改变,只需更改main函数,用想要的url创建一个字符串切片,然后对切片进行循环,并使用当前url和channel来调用responseSize,最后执行第二个单独的循环,对切片中的每个url运行一次,并从channel接收和打印一个值。(在单独的循环中做很重要,若在启动responseSize goroutine的同一个循环中接收值,将导致main goroutine阻塞,直到接收完成,将返回一次一个页面的请求。)

func main() {
	sizes := make(chan int)
	urls := []string{ //将url移动到一个切片中
		"https://example.com/",
		"https://golang.org/",
		"https://golang.org/doc",
	}
	for _, url := range urls {
		go responseSize(url, sizes) //对每个url调用responseSize
	}
	for i := 0; i < len(urls); i++ { //对每一个responseSize发送,都从channel接收一次
		fmt.Println(<-sizes)
	}
}

更新channel以携带一个struct

ressponseSize函数还有一个问题,我们不知道输出的顺序,因为没有把页面url与响应大小放在一起,所以不知道它们之间的关系。

channel可以像携带基础类型一样携带切片、映射和struct等复合类型,我们可以创建一个struct类型,它将存储一个页面url及其大小,这样就可以通过channel将两者一起发送了。

将使用底层的struct类型声明一个新的Page类型,Page将有一个URL字段来记录页面的URL,以及一个Size字段来记录页面大小。

将更新responseSize上的channel参数以保存新的Page类型,而不仅仅是int页面大小,让responseSize使用当前URL和页面大小创建一个新的Page值,并将其发送到channel。

在main中,将更新channel在调用中保存的类型,当从channel接收一个值时,将是一个Page值,因此将同时打印它的URL和Size字段。

type Page struct { //声明一个带有需要的字段的struct类型
	URL  string
	Size int
}

func responseSize(url string, channel chan Page) { //传递给responseSize的channel将携带Page,而不是int
	fmt.Println("Getting", url)
	response, err := http.Get(url)
	if err != nil {
		log.Fatal(err)
	}
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)
	if err != nil {
		log.Fatal()
	}
	channel <- Page{URL: url, Size: len(body)} //返回一个包含当前URL和页面大小的Page

}

func main() {
	pages := make(chan Page) //更改channel保存的类型
	urls := []string{
		"https://example.com/",
		"https://golang.org/",
		"https://golang.org/doc",
	}
	for _, url := range urls {
		go responseSize(url, pages) //将channel传递给responseSize
	}
	for i := 0; i < len(urls); i++ {
		page := <-pages                              //接收Page
		fmt.Println("%s: %d\n", page.URL, page.Size) //将URL和大小一起打印
	}
}

现在输出将把页面大小和URl配对,以前,我们的程序必须一次请求一个页面,goroutine让我们在等待网站响应时开始处理下一个请求,大大提高了速度。

Golang打怪升级

14.其余的东东

12.从失败中恢复

comments powered by Disqus