7.2 Channel

我们在本节会专门讲述Go语言所提倡的“应该以通信作为手段来共享内存”的最直接和最重要的体现——Channel。Channel也就是我们前面多次提到过的通道类型。它是Go语言预定义的数据类型之一。

Go语言鼓励使用与众不同的方法来共享值。这个方法就是使用一个通道类型值在不同的Goroutine之间传递值。Go语言的Channel就像是一个类型安全的通用型管道。(实际上,Channel的设计灵感来源于Tony Hoare在1985年首次公开的专著Communicating Sequential Processes中的论述。)

Channel提供了一种机制。它既可以同步两个被并发执行的函数,又可以让这两个函数通过相互传递特定类型的值来进行通信。虽然在有些时候使用共享变量和传统的同步方法也可以实现上述用途。但是,作为一个更高级的方法,使用Channel可以使让我们更加容易地编写清晰、正确的程序。

下面,我们就开始介绍与Channel有关的各方面知识。

7.2.1 Channel是什么

在Go语言中,Channel即指通道类型。有时,我们也用它来直接指代可以传递某种类型的值的通道。通道即是某一个通道类型的值,是该类型的一个实例。

1. 类型表示法

与切片类型和字典类型相同,通道类型是引用类型之一。一个泛化的通道类型的声明应该是这样的:

  1. chan T

其中,关键字chan是代表了通道类型的关键字,而T则表示了该通道类型的元素类型。通道类型的元素类型约束了可以经由此类通道传递的元素值的类型。例如,我们可以声明这样一个别名类型:

  1. type IntChan chan int

别名类型IntChan代表了元素类型为int的通道类型。又例如,我们可以直接声明一个chan int类型的变量:

  1. var intChan chan int

在被初始化之后,变量intChan就可以被用来传递int类型的元素值了。

以上展示了最简单的通道类型声明方式。这样的声明意味着该通道类型是双向的。也就是说,我们既可以向此类通道发送元素值,也可以从它那里接收元素值。此外,我们还可以声明单向的通道类型。这需要用到接收操作符<-。下面是只能被用来发送值的通道类型的泛化表示:

  1. chan<- T

只能被用来发送值的意思是,我们只能向此类通道发送元素值,而不能从它那里接收元素值。接收操作符<-形象的表示了元素值可能的流向。我们把这样的单向通道类型简称为发送通道类型。当然,我们也可以声明只能从中接收元素值的通道类型,形如:

  1. <-chan T

注意,这次接收操作符<-是在关键字chan的左边。这依然很形象,不是吗?相似地,此类的单向通道类型可以被简称为接收通道类型。

虽然在这样的类型声明中既有空格“ ”也有操作符<-,但是我们应该把它视为一个整体。它代表了一个类型,而不是某些复杂的东西。还记得吗?我们在第3章讲类型转换的时候提到过这样一个表达式:

  1. <-chan int(v)

在这个容易让我们产生疑惑的表达式中,chan int是一个整体,代表了一个通道类型。要注意,操作符<-并不是该类型的一部分。因此,该表达式的含义是,先将变量v的类型转换为chan int,然后再从该值中接收一个元素值。它相当于:

  1. <-(chan int(v))

它与表达式

  1. *string(v) // 相当于 *(string(v))

遵从类似的规则。我们应该在不添加作为辅助的圆括号的时候就知晓它们所表达的真正含义。正因为前面的那个原始的表达式具有这样的含义,所以当我们想把变量v的类型转换成一个接收通道类型的时候就应该这样表示:

  1. (<-chan int)(v)

在这里,附加的圆括号是必须的,也是必要的。否则,这个表达式就会被Go语言的编译器理解成之前的那种含义。

2. 值表示法

正因为通道类型是一个引用类型,所以一个通道类型的变量在被初始化之前它的值一定是nil。这也是此类型的零值。

与其他类型不同的是,通道类型的变量是被用来传递值的,而不是存储值的。所以,通道类型并没有对应的值表示法。它的值具有即时性,是无法用字面量来准确表达的。

3. 属性和基本操作

基于通道的通讯是在多个Goroutine之间进行同步的重要手段。而针对通道的操作本身也是同步的。在同一时刻,仅有一个Goroutine能向一个通道发送元素值,同时也仅有一个Goroutine能从它那里接收元素值。在通道中,各个元素值都是严格按照被发送至此的先后顺序排列的。最早被发送至通道的元素值会被最先接收。因此,通道相当于一个FIFO(先进先出)的消息队列。此外,通道中的元素值都具有原子性。它们是不可被分割的。通道中的每一个元素值都只可能被某一个Goroutine接收。已被接收的元素值会立刻被从通道中删除。

4. 初始化通道

我们已经知道,引用类型的值都需要使用内建函数make来初始化。通道类型也不例外。请看下面的调用表达式:

  1. make(chan int, 10)

这个表达式初始化了一个通道类型的值。传递给make函数的第一个参数表明此值的具体类型是元素类型为int的通道类型,而第二个参数则指出该值在同一时刻最多可以容纳10个元素值。也就是说,如果我们发送给该通道的元素值未被取走,那么该通道最多可以暂存(或者说缓冲)10个元素值。

当然,我们可以在初始化一个通道的时候省略第二参数值,像这样:

  1. make(chan int)

还记得吗?我们在初始化一个字典类型值的时候也可以这样做。但是,这两个类似的做法所起到的作用(或者说达到的效果)是截然不同的。

对于字典类型的值来说,是否传递第二个参数只会影响到该值的初始长度。其初始长度与第二个参数值相同或为0。由于字典类型的值的长度是可以自动增长的,所以这通常不会造成什么不同,除了在对程序性能非常敏感的情况下。而对于通道类型的值来说,传与不传第二个参数值会对被初始化的这个通道的特性产生非常大的影响。

最根本的原因是,一个通道类型的值的缓冲容量是固定不变的。它可同时容纳的元素值的最大数量永远等于在它被初始化时给定的第二个参数值。如果第二个参数值被省略了,那么就表示被初始化的这个通道无法缓冲任何元素值。发送给它的元素值必须被立刻取走。Go语言的运行时系统会依据我们的初始化方式来确定通道的行为。我们会在下一小节详细论述这一点。

实际上,我们把初始化时给定了第二个参数的通道称为缓冲通道,而把初始化时未给定第二个参数的通道称为非缓冲通道。我们下面仅会讲解操作缓冲通道的方法。关于非缓冲通道的操作方法,我们会在后面专门说明。

5. 接收元素值

接收操作符<-不但可以被作为通道类型声明的一部分,也可以被用来对通道进行操作(发送或接收元素值)。假如有这样一个通道类型的变量:

  1. strChan := make(chan string, 3)

内建函数make在被调用后会返回一个已被初始化的通道类型值作为结果。所以,这样的赋值语句使变量strChan成为了一个双向通道的代表。该通道的元素类型为string、容量为3

如果我们此时要从该通道中接收元素值的话,应该这样编写代码:

  1. elem := <-strChan

其中的接收操作符<-让我们很轻易就可以理解到这条语句所代表的含义——把strChan中的一个元素值赋给变量elem。不过,此时进行这类操作会使当前Goroutine被阻塞在这里。因为现在通道strChan中还没有任何元素值。当前Goroutine会被迫进入Gwaiting状态,直到strChan中有新的元素值可取时才会被唤醒。

我们在讲<-操作符的时候说过,像下面这样来编写这条赋值语句也是可以的:

  1. elem, ok := <-strChan

与前面的写法相同,当该通道中没有任何元素值时,当前Goroutine会被阻塞于此。如果在进行接收操作之前或过程当中该通道被关闭了,那么该操作会立即被结束,并且变量elem会被赋予该通道的元素类型的零值。采用上述两种写法都会是如此。由于相应元素类型的零值也可以被发送到通道中,所以当我们接收到这样一个元素值的时候就无从判断它所代表的含义,即它确实是通道中缓冲的一个元素值还是被用来表示该通道已经被关闭的标识。这时,第二种编写方法的优势就显现出来了。在特殊标记:=左边的第二个变量(这里是变量ok)被赋予的值会体现出实际的情况。在此语句中,变量ok必定会是一个布尔类型的值。当接收操作因通道关闭而被结束时,该值会为false(代表了操作失败),否则会为true。这样,我们就可以很容易地据此做出上述判断了。

我们把这些在特殊标记=:=的右边仅能是接收表达式的赋值语句称为接收语句。在其中的接收操作符<-右边的不仅仅可以是代表通道的标识符,还可以是任意的表达式。只要这个表达式的类型是通道类型即可。我们把这样的表达式称为通道表达式。

最后,还有一点需要注意,试图从一个未被初始化的通道类型值(即值为nil的通道类型的变量)那里接收元素值会造成当前Goroutine的永久阻塞!

6. Happens before

为了能够从通道接收元素值,我们应该先向它发送元素值。理所当然,一个值在被从通道中取出之前必须先存在于该通道内。更加正式地讲,如果一个通道是带缓冲的,那么:

  • 针对此通道的发送操作会被阻塞,直到被发送的元素值完全被复制到通道的缓冲区中。换句话说,通道缓冲元素值的这个动作的完成一定发生在相应的发送操作完成之前。

  • 针对此通道的接收操作也会被阻塞,直到当前Goroutine真正从中获取到一个元素值。换句话说,当前Goroutine(进行接收操作的那个Goroutine)接收到某个元素值的这个结果一定会形成在相应的接收操作完成之前。

  • 对于同一个元素值来说,把它发送给某个通道的操作一定会在从该通道接收它的操作完成之前完成。也就是说,在通道完全持有某一个元素值的副本之前,任何Goroutine都不可能从它那里接收到这个元素值。这一保证也是在前两个保证的基础上做出的。

以上就是与缓冲通道有关的“happens before”原则。Go语言的运行时系统会保证它们的有效性。这也是通道作为一个并发环境下的通讯工具应该具有的特性。

7. 发送元素值

通过了解上述原则,我们应该会对怎样操作一个缓冲通道有了更加清晰的认识。现在我们来讲讲向通道发送元素值的具体操作方法。这一操作是通过发送语句来完成的。

发送语句由通道表达式、接收操作符<-和代表元素值的表达式(以下简称元素表达式)组成。其中,元素表达式的类型必须与通道表达式的类型的元素类型之间存在可赋予的关系。也就是说,前者的值必须可以被赋给类型为后者的变量。

对接收操作符<-两边的表达式的求值会先于发送操作真正被执行之前。在对这两个表达式的求值完成之前,发送操作一定会被阻塞。并且,在这之后,发送操作是否得以进行还需要取决于其他因素。这在后面就会说明。

如果我们想向通道strChan发送一个元素值"a"的话,应该这样做:

  1. strChan <- "a"

接收操作符<-左边是将要接纳元素值的通道,而右边则是欲发送给该通道的那个元素值。在此表达式被求值之后,通道strChan中就缓冲了元素值"a"。下面我们再向它发送两个元素值:

  1. strChan <- "b"
  2. strChan <- "c"

现在,strChan中已经缓冲了3个元素值。这已经是它能够容纳元素值的最大数量了。我们已经知道,一个通道的缓冲容量是固定的。因此,在这之后,当某个Goroutine中的向通道strChan发送值的操作被执行的时候,该Goroutine会被阻塞在那里,直到该通道中有足够的空间容纳该元素值为止。在我们再从该通道中接收一个元素值之后,那个Goroutine就会被立即唤醒并且完成那个发送操作。

注意,与接收操作相同,当我们向一个值为nil的通道类型的变量发送元素值的时候,当前Goroutine也会被永久地阻塞!除此之外,如果我们试图向一个已被关闭的通道发送元素值,那么会立即引发一个运行时恐慌。即使发送操作正在因通道缓冲已满而被阻塞,这个通道的关闭也同样会使该操作引发一个运行时恐慌。这一点需要特别注意。我们肯定不希望因通道的关闭而使正常流程被迫中断。为了避免这样的流程中断,我们可以在select代码块中进行发送操作。这会在后面专门说明。

我们已经知道,针对通道的发送操作和接收操作都可能造成相关Goroutine的阻塞。如果有多个Goroutine因向同一个通道发送元素值而被阻塞,那么当该通道中有多余的缓冲空间的时候,最早被阻塞的那个Goroutine会最先被唤醒。也就是说,这里的唤醒顺序与发送操作的开始顺序相同。这对于接收操作来说也是如此。一旦通道中有了新的元素值,那么最早因从该通道接收元素值而被阻塞的那个Goroutine会最先被唤醒。无论是发送操作还是接收操作,运行时系统每次只会唤醒一个Goroutine。

关于这里的发送操作还有一点需要注意。那就是,在我们向通道发送一个值之后,该通道将会得到该值的一个副本,而非该值本身。当这个副本形成之后,我们对那个原来的值的任何修改都不会影响到通道中相应的副本。例如,我们有这样两个结构体声明:

  1. type Person struct {
  2. Name string
  3. Age uint8
  4. Address Addr
  5. }
  6. type Addr struct {
  7. city string
  8. district string
  9. }

注意,在结构体Person中有一个名为Address的字段。这个字段的类型也同时是一个结构体——Addr。现在,我们声明并初始化一个以Person为元素类型的通道,像这样:

  1. var personChan = make(chan Person, 1)

这个由变量personChan代表的通道的容量是1。然后,我们创建一个Person类型的值并把它发送给personChan

  1. p1 := Person{"Harry", 32, Addr{"Beijing", "Haidian"}}
  2. fmt.Printf("p1 (1): %v\n", p1)
  3. personChan <- p1

可以看到,p1所代表的那个人的大致住址是北京市海淀区。在发送操作完成之后,我们对p1的住址稍作改变:

  1. p1.Address.district = "Shijingshan"
  2. fmt.Printf("p1 (2): %v\n", p1)

第一行代码表明Harry已从北京市海淀区搬到了北京市石景山区。接下来,我们从personChan中的那个唯一的元素值,看看它是否会随着的p1的值的改变而被改变:

  1. p1_copy := <-personChan
  2. fmt.Printf("p1_copy: %v\n", p1_copy)

为了展现效果,我们在这小段代码中加入了3条打印语句。在运行这些代码之后,标准输出上会出现如下内容:

  1. p1 (0): {Harry 32 {Beijing Haidian}}
  2. p1 (1): {Harry 32 {Beijing Shijingshan}}
  3. p1_copy: {Harry 32 {Beijing Haidian}}

显而易见,通道中的元素值丝毫没有受到外界的影响。这说明了,在发送过程中进行的元素值复制并非浅表复制,而属于完全复制。这也保证了我们使用通道传递的值的不变性。

8. 关闭通道

从代码实现上来讲,关闭一个通道的操作实际上是非常简单的。通过对内建函数close的调用就能够实现。例如,如果我们想关闭通道strChan,那么如此编写一个调用表达式就可以了:

  1. close(strChan)

我们在前面讲过,不合时宜地关闭一个通道可能会给针对它的发送操作和接收操作带来问题。这样可能会对它们所在的Goroutine的正常流程的执行产生影响。因此,我们应该在保证安全的情况下进行关闭通道的操作。这会涉及一些技巧。比如我们会在后面讲到的for语句和select语句。不过在讲解这些高级方法之前,我们应该先明确一点:无论怎样都不应该在接收端关闭通道。因为在那里我们无法判断发送端是否还会向该通道发送元素值。如果非要这样做,那么就应该使用一些辅助手段来避免发送端引发运行时恐慌。然而,我们在发送端调用close以关闭通道却不会对接收端接收该通道中已有的元素值产生任何影响。这也是通道非常优秀的特性之一。示例如下:

  1. func main() {
  2. ch := make(chan int, 5)
  3. sign := make(chan byte, 2)
  4. go func() {
  5. for i := 0; i < 5; i++ {
  6. ch <- i
  7. time.Sleep(1 * time.Second)
  8. }
  9. close(ch)
  10. fmt.Println("The channel is closed.")
  11. sign <- 0
  12. }()
  13. go func() {
  14. for {
  15. e, ok := <-ch
  16. fmt.Printf("%d (%v)\n", e, ok)
  17. if !ok {
  18. break
  19. }
  20. time.Sleep(2 * time.Second)
  21. }
  22. fmt.Println("Done.")
  23. sign <- 1
  24. }()
  25. <-sign
  26. <-sign
  27. }

在该示例中,我们分别启用了两个Goroutine来对通道进行发送操作和接收操作。发送操作共有5次,每次操作的间隔是1秒。在所有发送操作都完成之后,我们会立即关闭该通道。另一方面,接收操作会持续地进行,每次操作的间隔是2秒。在通过接收语句中的第二个被赋值的变量得知该通道已被关闭之后,我们会结束包含它的for循环,并打印Done.。上述发送操作和接收操作的不同间隔时间的意义在于,接收端在没有将通道中已有的元素值全部接收完毕之前,该通道就会被关闭。

在我们运行这段代码之后,标准输出上会被打印出如下内容:

  1. 0 (true)
  2. 1 (true)
  3. 2 (true)
  4. The channel is closed.
  5. 3 (true)
  6. 4 (true)
  7. 0 (false)
  8. Done.

很明显,运行时系统并没有在通道ch被关闭之后立即把false作为相应接收操作的第二个结果,而是等到接收端把已在通道中的所有元素值都接收到之后才这样做。这确保了在发送端关闭通道的安全性。

由此,更确切地讲,调用close函数的作用是告诉运行时系统不应该再允许任何针对被关闭的通道的发送操作,该通道即将被关闭。虽然我们调用close函数只是让相应的通道进入关闭状态而不是立即阻止对它的一切操作,但是为了简化概念我们仍然笼统地称在对close函数的调用返回之后该通道就已经被关闭了。不过,读者应该将这其中的真正含义铭记于心。

对于同一个通道,运行时系统只允许我们关闭一次。因此,试图关闭一个已被关闭的通道会引发一个运行时恐慌。而若在调用close函数时的参数值是一个值为nil的通道类型的变量则也会是如此。

顺便提一下,我们在这段代码中还另外声明并初始化了一个通道sign。该通道在这里的作用是推迟主Goroutine被运行完成的时间。这利用到了通道本身的特性,也是我们在主Goroutine中启用的那两个Goroutine能够被正常运行完成的关键。我们在下一小节还会碰到这种惯用法。

如果读者需要了解与内建函数close有关的更多知识,请查阅本书的第3章。

9. 通道的长度与容量

内建函数lencap也是可以作用在通道之上的。它们的作用分别是获取通道的当前实际缓冲的元素值的数量(即长度)和通道可容纳元素值的最大数量(即容量)。通道的容量是在初始化该通道的时候已经设定的,并且在之后也不会被改变。而通道的长度则会随着实际的情况实时变化。

这两者的区别也决定了我们对它们的使用方法的不同。对于通道的容量来说,如果我们需要重复使用它,就应该先使用一个变量将它缓存起来。然而,对于通道长度,我们决不能这样做。因为我们是无法确定它变化的时间以及具体数值的。因此,我们应该在每次要获取通道的长度的时候都调用一次len函数。

除此之外,我们可以通过的容量来判断它是否带有缓冲。若其容量为0,那么它肯定就是一个不带缓冲的通道,否则它就应该是一个带缓冲的通道。

至此,我们已经介绍了通道类型及其值的表示法、基本属性,以及操作缓冲通道的基本方法。通道类型的表示法非常独特,其中必须包含关键字chan、空格“ ”和代表元素类型的字面量。然而,通道类型的值(之后也会简称为通道)却因其即时性而无法被表示。我们最应该记住的通道的特性就是它的自同步和其中元素值的原子性。这些特性在针对通道的接收操作和发送操作中体现得最为明显。在本节的后续部分,我们会陆续介绍各种与通道相关的规则以及惯用法。依照这些规则和惯用法,我们就能够充分发挥出这一独特的通讯工具的强大威力,并可以用它构建出非常复杂但依然稳固、清晰的并发程序。

7.2.2 单向Channel

我们在上一小节讲通道类型声明的时候提到过单向通道这个概念。但没有讲到它们的应用场景。单向通道可分为接收通道和发送通道。需要注意的是,无论哪一种单向通道,都不应该出现在一个变量的声明中。这是为什么呢?请试想一下,如果我们声明并初始化了这样一个变量:

  1. var uselessChan chan<- int = make(chan<- int, 10)

那么应该怎样去使用它呢?显然,一个只进不出的通道没有任何意义。虽然这行代码可以通过编译,但它其实没有什么可用之处。那么单向通道的应用场景又在哪里呢?

实际上,单向通道常常由双向通道变换而来。我们可以用这种变换来约束某个函数或某个使用方程序对通道的使用方式。例如,我们在上一章讲信号的时候介绍过,os/signal.Notify函数的声明是这样的:

  1. func Notify(c chan<- os.Signal, sig ...os.Signal)

该函数的第一个参数就是一个单向通道类型的。从表面上看,调用它的程序需要传入一个只能发送而不能接收的通道。然而,我们不应该传给它这样一个值。实际上,这个参数声明表达的含义是,os/signal.Notify函数内部只会对该通道发送元素值,而不会从该通道接收元素值。因为,试图从一个发送通道中接收元素值会造成一个编译错误。从另一方面看,调用该函数的程序应该只从这个通道中接收元素值,向其发送元素值只会造成纯粹的干扰。也就是说,参数c是函数内部向函数调用方传递系统信号的一个途径。这个途径的方向应该是单一的,通讯双方会分别使用到不同的操作(即发送或接收)。综上所述,我们在调用此函数的时候应该传入一个双向的通道,并且要自觉遵守该函数声明中隐含的约定。双向通道在被传递给该函数的过程中会被自动地转换为相应参数声明所示的单向通道。

函数os/signal.Notify以这样的声明方式向我们传达了它的第一个参数的真正意义。它是利用Go语言现有的语法规则做到这一点的。同时,这也是“代码即注释”这种编程风格的一个体现。这种Go语言特有的代码编写手法是值得我们学习和效仿的。

请想象一下,如果这样一个声明被包含在接口声明中会起到什么样的作用?例如:

  1. type SignalNotifier interface {
  2. Notify(c chan<- os.Signal, sig ...os.Signal)
  3. }

接口类型SignalNotifier的声明中包含了与os/signal.Notify函数完全一样的方法声明。声明一个接口类型的意义就在于会有一到多个自定义数据类型来实现它。它是对某一类数据类型的归纳和抽象。因此,这里的参数c的声明明确表达了一个实现规则,即所有实现该接口的数据类型的Notify方法内部只能向c发送元素值。这就相当于利用语法级别的约束来避免实现类型对参数c的实际值进行错误的操作。

现在,我们再来对SignalNotifier接口类型的声明稍作改变。改变后的声明是这样的:

  1. type SignalNotifier interface {
  2. Notify(sig ...os.Signal) <-chan os.Signal
  3. }

可以看到,我们把Notify方法中的第一个参数声明去掉了,然后为它添加了一个看起来与前者有些类似的结果声明。请注意接收操作符<-与关键字chan的位置关系。在结果声明中的是一个接收通道,而非发送通道。与前一个版本的Notify方法的声明恰恰相反,此方法声明的约束目标是方法的调用方,而非方法的实现方。Notify方法的调用方只能从其结果值中接收元素值,而不能向其发送元素值。

这两个版本的Notify方法传递系统信号的方式是相同的——使用单向通道。并且,系统信号在其中的单向通道中的传递方向也是相同的——从方法内部传至方法调用方。这表明它们所体现的功能完全一致。

这两个方法声明的真正不同在于对其中的单向通道的使用方式,即它们分别对单向通道一端的使用者进行了约束。这使得它们分别适用于不同的应用场景。前一个版本的方法声明更合适存在于接口类型中,因为它可以作为该接口的实现规则之一。后一个版本的声明更适用于结构体的方法和独立函数,原因是它隐含并约束了对函数或方法的结果值的使用方式。但是,这并不是绝对的。比如,在os/signal.Notify函数的声明中,参数c的类型就隐含了函数调用方对该通道的使用规则。虽然此规则是可以轻易被破坏的,但是这对于函数调用方来说没有任何好处。因此,这样是可以达到约束目的的。

当然,我们在同一个函数声明中连用上述两种方式也是可以的。例如,我们需要批量地更改一些人员信息(由在上一小节编写好的Person类型的值代表)中的住址。这其中有几个具体要求。

  • 我们事先并不知道这些人员都有哪些。

  • 住址的更改方式可以由使用者灵活掌握,并能够实时地替换。

  • 经过处理的人员信息应该自动地被传递给使用者。

这样一个批处理任务应该怎样实现呢?请读者先自己思考一下。

如果你确实经过了思考,请继续往下看。我们先按照上述要求定义一个接口:

  1. type PersonHandler interface {
  2. Batch(origs <-chan Person) <-chan Person
  3. Handle(orig *Person)
  4. }

接口类型PersonHandler中包含了两个方法声明。方法Batch被声明为了实现批量处理人员信息功能的方法。其中的参数声明可以满足第一个具体要求。需要被更改住址的人员信息可以经由参数origs传递至方法内部。而其中的结果声明则满足了第三个要求。方法调用方只要持有该方法的结果值就可以实时地接收到已被处理的人员信息。注意,在Batch方法的声明中的这两个通道分别对该方法和该方法的调用方使用它的方式进行了约束。至于第二个要求,我们是通过该接口中的另一个方法来实现的。这会在后面的Batch方法的实现中展现出来。

由于我们这里只关注与单向通道的使用有关的部分,所以下面仅讨论PersonHandler接口中的Batch方法的实现。不过在这之前,我们应该先展示一下该接口的实现类型的声明:

  1. type PersonHandlerImpl struct{}

这个声明很简单。它其实就是一个不包含任何字段的结构体类型。为了让这个类型真正实现接口类型PersonHandler,我们还要声明这样两个方法:

  1. func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
  2. // 省略若干条语句
  3. }
  4. func (handler PersonHandlerImpl) Handle(orig *Person) {
  5. // 省略若干条语句
  6. }

好了,现在我们就开始填充PersonHandlerImpl类型的Batch方法的方法体。首先,既然需要被更改的人员信息会通过单向通道origs传递进来,那么我们就应该不断地试图从该通道中接收它们。不过,我们应该处理通道已被关闭的情况。因为这象征着所有的人员信息都已被传递完毕,也预示着批处理流程执行的结束。所以,我们应该这样来编写Batch方法体的第一个版本:

  1. for {
  2. p, ok := <-origs
  3. if !ok {
  4. break
  5. }
  6. }

接下来,我们再来考虑作为结果值的那个单向通道。由于这个单向通道不来自于Batch的参数,所以该方法应该自行创建并初始化它。又由于Batch方法的返回并不意味着这一批处理过程的完成(既然该方法的结果值是一个单向通道,那么就应该充分体现出这种结果传输方式的优势,即传输应该是异步的)。综上所述,Batch方法体的第二个版本如下:

  1. dests := make(chan Person, 100)
  2. for {
  3. p, ok := <-origs
  4. if !ok {
  5. close(dests)
  6. break
  7. }
  8. handler.Handle(&p)
  9. dests <- p
  10. }

如上所示,我们先调用make函数并初始化了一个元素类型为Person、容量为100的双向通道dests。然后,每当从单向通道origs中接收到一个有效的人员信息p的时候,我们都会使用本类型的Handle方法来处理这个p。最后,我们会及时地把刚刚被处理完成的p发送给dests

可以看到,for代码块的执行会一直持续到通道origs已被关闭的时候。所以,在此之前,对Batch方法的调用是无法结束的。方法调用方也许会长时间得不到该方法的结果值。并且,如果是这样的话,我们使用通道作为Batch方法的结果值的做法也失去了意义。那么我们应该怎样做呢?

答案是启用一个Goroutine来执行for语句块。只有这样才能真正地实现已处理p的异步传输。请看下面的代码:

  1. func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
  2. dests := make(chan Person, 100)
  3. go func() {
  4. for {
  5. p, ok := <-origs
  6. if !ok {
  7. close(dests)
  8. break
  9. }
  10. handler.Handle(&p)
  11. dests <- p
  12. }
  13. }()
  14. return dests
  15. }

我们之前说过,基于通道的通讯是在多个Goroutine之间进行同步的重要手段。通道既能够被用来在多个Goroutine之间传递数据,又能够在数据传递的过程中起到同步的作用。在上面的这个Batch方法的第三个版本中,通道的这两个重要用途都被很好地展现了出来。如果我们把Batch方法的调用方所在的Goroutine称为G1,并把在该方法中新启用的Goroutine称为G2,那么利用通道在它们之前传递数据的示意如图7-1所示。

7.2 Channel - 图1

图 7-1 通道的作用示意

至此,通过对Goroutine和通道的使用,我们以异步的方式实现了对若干人员信息的批量处理。读者应该能够从这个示例中初步感受到通道的威力。这种实现方式要比使用其他同步手段(比如互斥量或条件变量)强很多。实际上,我们可以轻易地使用通道模拟出前面提到的大部分同步方法的应用流程。读者如果感兴趣的话,可以试着写出这些模拟代码。

更进一步地,如果我们要更完整地实现批量更改人员信息的流程的话,G1中的代码就可能会是这样:

  1. handler := getPersonHandler()
  2. origs := make(chan Person, 100)
  3. dests := handler.Batch(origs)
  4. fecthPerson(origs)
  5. sign := savePerson(dests)
  6. <-sign

其中,getPersonHandler函数会返回一个PersonHandler类型的结果值。这个值的动态类型可以是我们已经在前面部分实现了的PersonHandlerImpl类型,也可以是其他实现了PersonHandler接口的类型。函数fecthPerson的功能是从某处取出欲处理的人员信息并把它们发送给origs通道,而savePerson函数的功能则是从dests通道中取出已变更的人员信息并把它们存储到某处。此外,savePerson函数还会返回一个通道sign。该通道的作用是,在批处理完全执行结束之前阻塞G1。由于在通道中没有任何元素值的时候,对缓冲通道的接收操作会阻塞当前的Goroutine,所以实现此功能也是非常容易的。以下是上述3个函数的声明:

  1. func getPersonHandler() PersonHandler
  2. func fecthPerson(origs chan<- Person)
  3. func savePerson(dest <-chan Person) <-chan byte

请读者自行实现这3个方法。必要时,读者可以参照图7-2所示的流程图。

7.2 Channel - 图2

图 7-2 批量处理人员信息的流程

在这幅流程图中,我们以Goroutine为界划分出了4个子流程。为什么又多出了两个Goroutine呢?这主要是为了让此批处理流程实现完全的异步化。所以,建议异步地获取和存储人员信息,并使用现有的两个通道origsdests满足它们与其他Goroutine之间的同步和传递数据的需要。我们称用于获取人员信息的Goroutine为G3,而用于存储人员信息的Goroutine为G4。它们分别与fecthPerson函数和savePerson函数的功能相对应。可以看到,G3和G4的流程与G2的流程非常相似。所以把它们编写出来应该并不困难。

或许,读者应该先动手实现出普通版本的fecthPerson函数和savePerson函数。也就是说,不让它们启用Goroutine,而只是单纯地向通道origs发送原始的人员信息,以及从通道dests接收已被处理的人员信息。即先再想办法让这个批处理程序正确地运行起来。其中,人员信息的获取操作和存储操作可以用模拟的函数代替。

这两个函数的普通版本是否可以实现批处理流程的异步化呢?请读者先去编写它们,然后再来思考这个问题。

为了让读者看得更清楚,我们再通过图7-3描述一下人员信息经由两个通道在多个执行步骤之间的流转情况。

{%}

图 7-3 人员信息的流转示意

我们应该让每一个人员信息可以独立地穿过整个的批处理程序。在穿过的过程中,批处理程序中的多个执行步骤会先后对此人员信息进行处理。然而,普通版本的fecthPerson函数和savePerson函数会让已经被异步化的Batch方法所做的努力变得毫无意义。为什么要把所有需要处理的人员信息都获取到之后再对它们进行变更呢?这样会增长大多数人员信息的处理时间,也会使整个批处理流程笨重许多。

总之,对于运行在多核CPU的计算机上的程序来说,使用并发编程的手法总会比普通的串行编程更高效一些。但是,我们需要仔细考量,并发编程所带来复杂性是否值得我们这样去做。在Go语言的并发程序中,通道会成为联系各个Goroutine的纽带。它可以正确、清晰和高效地实现Goroutine之间的同步和实时的数据传递。我们借着介绍单向通道的机会,通过前面的这个示例展示了双向通道和单向通道的典型应用场景和使用方式。

最后,请注意,在这个已被异步化的批处理流程中,原先参与人员信息传递的G1已经退居幕后了。它现在的主要职责是初始化各种资源以及对G2、G3和G4的运行进行协调。在并发的程序中,我们总是应该构建一段可以统揽全局、协调各个部件的程序。

其实,这个批处理流程中的几个子流程以及前面所示的Batch方法的实现代码都依然存在着很大的优化空间。比如,我们可能会需要并发地处理或存储多个人员信息。但是,鉴于本小节的主题和篇幅,我暂时就讨论到这里。一个该流程的较完整的参考实现存放在goc2p项目的代码包chan1/oneway中,文件名为phandler.go。不过,强烈建议读者在实现了自己的版本并正确运行之后再去查看它。

经过前面的练习和思考之后,相信读者对双向和单向的通道的基本使用会有一个比较深刻的理解。请记住,直接初始化一个单向通道毫无意义。单向通道的真正含义是在一定作用域(某个代码块)内约束使用者对它的使用方式,也即暗示和控制相应数据的流转方向。

单向通道往往由双向通道转换而来。那么,单向通道是否可以被转换回双向通道呢?请记住这样一句话,通道所允许的数据传递方向是它的类型的一部分。对于两个通道类型而言,方向的不同就意味着它们类型的不同。也就是说,元素类型相同的双向通道、发送通道和接收通道都属于不同的类型。所以,我们不能像下面这样转换通道的类型:

  1. var ok bool
  2. ch := make(chan int, 1)
  3. _, ok = interface{}(ch).(<-chan int)
  4. _, ok = interface{}(ch).(chan<- int)
  5. sch := make(chan<- int, 1)
  6. _, ok := interface{}(sch).(chan int)
  7. rch := make(<-chan int, 1)
  8. _, ok := interface{}(rch).(chan int)

在上面这段代码中,每一个类型转换表达式的结果都会是否定的,即变量ok的值总会是false

因此,我们只能利用函数声明来约束通道的方向。比如,利用函数的参数声明把函数调用方所持的双向通道转换为单向通道,并提供给函数内部使用。又比如,利用函数的结果声明把函数内部所持的双向通道转换为单向通道,并提供给函数调用方使用。

注意,即使利用函数声明转换通道类型,也无法把单向通道转换为双向通道。并且,通过这种方式也不可能改变单向通道的方向。尝试这样做只会造成编译错误。请看下面的这个示例:

  1. ch1 := make(chan int, 1)
  2. f := func(c chan<- int) chan int {
  3. return c // 这里会造成编译错误
  4. }
  5. ch2 := f(ch1)

变量ch1是元素类型为int的双向通道。我们把它作为参数值传给了变量f代表的匿名函数。在这个匿名函数的内部,它变成了一个发送通道。然后,我们想以相似的方式让它变回双向通道,但是却失败了。该匿名函数中的return语句会造成一个编译错误。由此看来,我们只能用这种方式附加对通道的使用约束,而不能解除它。这难免让人有些遗憾,但是这也确实避免了因太过灵活而导致的混乱局面。

好了,通过认真阅读本小节的内容,你是否对单向通道的概念、使用和含义真正理解了呢?我想,答案应该是肯定的。

7.2.3 for语句与Channel

我们在讲for语句的时候已经提到过,使用该语句及其range子句可以持续地从一个通道中接收元素值。因此,我们在本小节只是再简单汇总一下与该用法有关的知识。

首先,我们来看一下这种用法的基本表现形式:

  1. var ch chan int
  2. // 省略若干条语句
  3. for e := range ch {
  4. fmt.Printf("Element: %v\n", e)
  5. }

我们先声明了一个双向通道,然后试图使用for语句接收其中的元素值。在单次的迭代中,range子句会尝试从通道ch中接收一个元素值,并把它赋给唯一的迭代变量e。注意,range子句的迭代目标不能是一个发送通道。与从发送通道中接收元素值的行为一样,这样做会造成一个编译错误。

我们在前面说过,从一个还未被初始化的通道中接收元素值,会导致当前Goroutine被永久地阻塞。当然,使用for来进行接收操作也不会例外。因此,上面的代码存在一个明显的缺陷。下面是改进的版本:

  1. var ch chan int
  2. // 省略若干条语句
  3. if ch != nil {
  4. for e := range ch {
  5. fmt.Printf("Element: %v\n", e)
  6. }
  7. }

这样确实可以避免因通道初始化问题导致的Goroutine阻塞。但是这种接收方式与普通的接收操作一样,当通道(注意,我们在本小节依然只针对缓冲通道)中没有任何元素值的时候,当前Goroutine依然会陷入阻塞。阻塞的具体位置会在其中的range子语句处。

语句for会不断地尝试从通道中接收元素值,直到该通道被关闭。在相关的通道被关闭后,若通道中已无元素值或当前的Goroutine正阻塞于此,则这条for语句的执行会立即结束。而当此时的通道中还有遗留的元素值时,运行时系统会等for语句把它们全部接收后再结束该语句的执行。当然,在结束执行该for语句之前,当前的Goroutine会先被唤醒。

经过上述说明,我们可以很容易地把已在上一小节中实现的Batch方法改造成这样:

  1. func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
  2. dests := make(chan Person, 100)
  3. go func() {
  4. for p := range origs {
  5. handler.Handle(&p)
  6. dests <- p
  7. }
  8. fmt.Println("All the information has been handled.")
  9. close(dests)
  10. }()
  11. return dests
  12. }

在这个版本的Batch方法中,for语句不断尝试接收通道origs中的元素值。每接收到一个元素值,for代码块中的代码都会立即对它进行处理,并通过dests通道把它传递给下一道工序。若在某次迭代开始时发现origs通道已被关闭且已无元素值可接收,那么运行时系统就会立即结束这条for语句的执行,并继续执行在它后面的语句。这次改造使得该Goroutine(G2)中的代码的数量减少了三分之一,并且在流程的实现上也更加清晰了。

在需要循环的接收通道中的元素值的场景下,我们总是应该优先使用for语句来实现。

7.2.4 select语句

在本小节,我们要介绍一个仅能被用于发送和接收通道中的元素值的专用语句——select语句。一个select语句在被执行的时候会选择执行其中的某一个分支。在表现形式上,select语句与switch语句非常类似,但是它们选择分支的方法是完全不同的。

1. 组成和编写方法

select语句中,每个分支依然以关键字case开始。但与switch语句不同的是,跟在每个case后面的只能是针对某个通道的发送语句或接收语句。我们在7.2.1节中专门介绍过这两种语句。select语句的另一个特点是,在select关键字右边并没有像switch语句那样的switch表达式,而是直接后跟花括号“{”。这也与它选择分支的方法有关。下面是select语句的典型用法的一个示例:

  1. var ch1 = make(chan int, 10)
  2. var ch2 = make(chan string, 10)
  3. // 省略若干条语句
  4. select {
  5. case e1 := <-ch1:
  6. fmt.Printf("1th case is selected. e1=%v.\n", e1)
  7. case e2 := <-ch2:
  8. fmt.Printf("2th case is selected. e2=%v.\n", e2)
  9. default:
  10. fmt.Println("default!")
  11. }

针对这条select语句中的每一个case,我们都初始化了一个通道。通道的类型不受任何约束。也就是说,在select语句中操作的那些通道的元素类型和容量都可以是任意的。另外,select语句也可以包含default case(也可以被称为默认分支)。如果select语句中的所有case都不满足选择条件且存在default case,那么default case就会被执行。

2. 分支选择规则

在开始执行select语句的时候,所有跟在case关键字右边的发送语句或接收语句中的通道表达式和元素表达式都会先被求值。求值的顺序是自上而下、从左到右的。无论它们所在的case是否有可能被选择都会是这样。我们通过下面的示例就可以证实这一点。

首先,我们需要准备如下几个变量:

  1. var ch3 chan int
  2. var ch4 chan int
  3. var chs = []chan int{ch3, ch4}
  4. var numbers = []int{1, 2, 3, 4, 5}

其中,变量chsnumbers分别代表了包含了有限元素的通道列表和整数列表。下面的select语句使用到了它们:

  1. select {
  2. case getChan(0) <- getNumber(2):
  3. fmt.Println("1th case is selected.")
  4. case getChan(1) <- getNumber(3):
  5. fmt.Println("2th case is selected.")
  6. default:
  7. fmt.Println("default!")
  8. }

这其中包含了两个通道表达式和两个元素表达式。通道表达式由对getChan函数的调用表达式代表,而元素表达式则由对getNumber函数的调用表达式代表。这两个函数的声明如下:

  1. func getNumber(i int) int {
  2. fmt.Printf("numbers[%d]\n", i)
  3. return numbers[i]
  4. }
  5. func getChan(i int) chan int {
  6. fmt.Printf("chs[%d]\n", i)
  7. return chs[i]
  8. }

我们在这两个函数中分别添加了打印语句。通过这些被打印出的内容,我们可以得知这些表达式被求值的顺序。当上面那个select语句被执行时,标准输出上会出现如下的内容:

  1. chs[0]
  2. numbers[2]
  3. chs[1]
  4. numbers[3]
  5. default!

上面内容的最后一行表示在本次执行select语句的过程中被选中并执行的是default case。不过,虽然前面那两个case都没有被选中,但是它们的表达式都被求值了。从前4行内容可以看出,它们的求值顺序正如我们前面所说的那样。也就是说,运行时系统会自上而下地求值每个case的表达式。并且,同一个case的多个表达式会以从左到右的顺序被求值。

这其中的通道是可以为nil的,不管代表它的是标识符还是表达式。但是,这样的话,它所属的case就会永远被无视,就像select语句中根本就没有包含它一样。例如,在下面的select语句中,被选中并执行的永远会是default case

  1. var ch5 chan int
  2. var ch6 chan string
  3. select {
  4. case ch5 <- 1:
  5. fmt.Println("1th case is selected.")
  6. case ch6 <- "1":
  7. fmt.Println("2th case is selected. ")
  8. default:
  9. fmt.Println("default!")
  10. }

这实际上是由这两种操作的特性决定的。还记得吗?针对值为nil的通道类型的变量的发送操作和接收操作都会使当前的Goroutine被永久地阻塞。因此,根据针对select语句的分支选择规则,这类case才会给予我们这样的表象。下面,我们来说说select语句分支选择的具体规则。

在执行select语句的时候,运行时系统会自上而下地判断每个case中的发送或接收操作是否可以被立即进行。这里的立即进行的意思是当前Goroutine不会因此操作而被阻塞。所以,对这个是否可立即进行的判定还需要依据通道的具体特性(缓冲或非缓冲)和那一时刻的具体情况来进行。

当发现第一个满足选择条件的case时,运行时系统就会执行该case所包含的语句。这也意味着其他case会被忽略。如果同时有多个case满足条件,那么运行时系统会通过一个伪随机的算法决定哪一个case将会被执行。例如,下面的代码会随机的向一个通道发送5个范围在[1,3]的整数:

  1. chanCap := 5
  2. ch7 := make(chan int, chanCap)
  3. for i := 0; i < chanCap; i++ {
  4. select {
  5. case ch7 <- 1:
  6. case ch7 <- 2:
  7. case ch7 <- 3:
  8. }
  9. }
  10. for i := 0; i < chanCap; i++ {
  11. fmt.Printf("%v\n", <-ch7)
  12. }

这段代码被执行后,打印在标准输出上的内容可能会是:

  1. 3
  2. 2
  3. 2
  4. 1
  5. 2

另一方面,如果被执行的select语句中的所有case都不满足选择条件并且没有default case的话,那么当前Goroutine就会一直被阻塞于此,直到某一个case中的发送或接收操作可以被立即进行为止。注意,如果这样的select语句中的所有case右边的通道都是nil,那么当前Goroutine就会永远地被阻塞在这条select语句上!我们的程序中永远不要出现像下面这样的代码:

  1. var ch8 chan int
  2. var ch9 chan string
  3. select {
  4. case ch8 <- 1:
  5. fmt.Println("1th case is selected.")
  6. case ch9 <- "1":
  7. fmt.Println("2th case is selected. ")
  8. }

如果当前程序只有主Goroutine且包含了这段代码的话,那么就会导致死锁的发生。

所以,在通常情况下,default case对于select语句来说总是有必要的。select语句只能包含一个default case,且这个特殊的case可以被放置在该语句的任何位置上:

  1. ch10 := make(chan int, 10)
  2. ch10 <- 1
  3. select {
  4. default:
  5. fmt.Println("default!")
  6. case e10 := <-ch10:
  7. fmt.Printf("1th case is selected. e10=%v.\n", e10)
  8. }

无论default case被放置在哪儿,都不会影响到我们在上面说明的分支选择规则。因此,这段代码被执行后,下面的内容依然会被打印到标准输出上。

  1. 1th case is selected. e10=1.

3. 更多惯用法

我们已经知道,接收操作符<-可以从一个通道中接收一个元素值,并可以通过与=:=联接把该元素值赋给一个或两个变量。如果同时对两个变量赋值,那么第二个变量便会指明当前通道是否已被关闭。在case关键字右边的接收语句当然也支持这种赋值形式。请看下面的代码:

  1. ch11 := make(chan int, 1000)
  2. // 省略若干条语句
  3. select {
  4. case e, ok := <-ch11:
  5. if !ok {
  6. fmt.Println("End.")
  7. break
  8. }
  9. fmt.Printf("%d\n", e)
  10. }

依据接收表达式的第二个结果值判断通道的关闭状态的这种方式我们应该已经很熟悉了。如果发现通道已被关闭,我们就打印提示内容并退出当前的select语句。其中的break语句的作用就是立即使当前select语句结束执行,无论当前的case中是否还存在未被执行的语句。

在真正的应用程序中,我们常常需要把select语句放到一个单独的Goroutine中去。现在,我们专门启用一个Goroutine来执行上面这条select语句:

  1. go func() {
  2. select {
  3. case e, ok := <-ch11:
  4. if !ok {
  5. fmt.Println("End.")
  6. break
  7. }
  8. fmt.Printf("%d\n", e)
  9. }
  10. }()

这样可以完全避免因select语句的执行而可能导致的死锁现象。我们再来关注这条select中的接收操作。由于select语句在被执行的时候只会对其中的某一个通道操作一次,所以上面这段代码最多只能从ch11通道中接收到一个元素值。为了连续地接收元素值,我们应该把select语句包含在一条for语句中,如下所示:

  1. go func() {
  2. for {
  3. select {
  4. case e, ok := <-ch11:
  5. if !ok {
  6. fmt.Println("End.")
  7. break
  8. } else {
  9. fmt.Printf("%d\n", e)
  10. }
  11. }
  12. }
  13. }()

上面这条for语句不包含任何子句。这就意味着被它包裹的select语句会被一次接一次地执行。当通道ch11中未包含任何元素值的时候,当前Goroutine会被阻塞在这条select语句上。直觉上,由于有了变量ok,我们应该可以在ch11通道被关闭后及时退出这条for语句。但是,当我们真正执行这条for语句的时候,标准输出上可能会出现这样的内容:

  1. .
  2. .
  3. .
  4. 47
  5. 48
  6. 49
  7. End.
  8. End.
  9. End.
  10. End.
  11. .
  12. .
  13. .

我们用原点“.”表示还有很多内容没有在这里展示。显然,在我们关闭ch11通道之后,那个for并没有被结束。break语句只是让当前的select语句结束执行而已。由于它的外层还有for语句,所以流程控制权马上又会重新由select语句掌管,如此往复。最终,这个Goroutine会被一直运行下去而无法结束,直到当前程序被终结。看来我们需要两条break语句结束这个循环。但是,对于被用来退出外层for循环的break语句来说,必须要有一个标志来触发它的执行。代码如下:

  1. go func() {
  2. var e int
  3. ok := true
  4. for {
  5. select {
  6. case e, ok = <-ch11:
  7. if !ok {
  8. fmt.Println("End.")
  9. break
  10. } else {
  11. fmt.Printf("%d\n", e)
  12. }
  13. }
  14. if !ok {
  15. break
  16. }
  17. }
  18. }()

可以看到,我们在for语句的前面先声明了两个变量。它们分别被用来存放通道ch11中的元素值和关闭标志。对于第二条break语句来说,仅有变量ok是必要的。那么我们为什么还要声明变量e呢?请注意,上面代码中的那个唯一的case后面,接收语句已经变成了这样:

  1. e, ok = <-ch11 // 在前一段示例代码中,它是这样的:e, ok := <-ch11

还记得吗?特殊标记:==代表着完全不同的赋值方式。前者会声明变量并给变量赋值,而后者则只能给已经声明的变量赋值。如果我们不事先声明变量e,那么这里被用来赋值的特殊标记只能依然是:=。这意味着,运行时系统会把它左边的eok都当作代表新变量的标识符来看待。

我们在这里再复习一下之前讲过的概念。如果在当前的代码块中已经存在了变量e,那么使用:=声明在内层代码块中的变量e就会遮蔽外层的同名变量。即使这两个同名变量的类型不同也会是这样。当然,在内层代码块中的那个变量的作用域之外并不存在这种遮蔽现象。如果这两个同名变量的作用域是完全相同的,那么自后面的变量被声明的那时起,之前声明的那个同名变量就会被永远地遮蔽。这种对已有标识符的复用也被称为变量的重声明。我们在第3章讲变量的时候对此做过介绍。

现在回到正题。按照我们的意愿,case右边的ok应该指代在for语句前面声明的那个变量,而不应该是一个新的变量。因为如果是新的变量,我们在case代码块中对变量ok的赋值将不会影响到该代码块之外的那个ok。其原因是,以case代码块为界,ok所指代的变量是不同的。在这种情况下,作用域更大的那个ok变量的值会一直是true。也就是说,for循环仍然永远不会退出。这就是我们在for语句的前面先声明变量e,并在case后使用=为它和ok赋值的根本原因。

在我们像前面那样使用两条break语句并正确地对ok变量进行声明和赋值之后,for语句的执行就可以在通道ch11被关闭之后立即结束了。其打印出的内容如下所示:

  1. .
  2. .
  3. .
  4. 47
  5. 48
  6. 49
  7. End.

在有些时候,我们并不想等到通道被关闭之后再退出循环,因为对于一些流程来说那样就太迟了。但是,针对通道的接收操作(以及发送操作)并不没有超时这一概念。所以我们只能使用某些辅助手段来达到此目的。

我们已经熟知了针对缓冲通道的各个操作的所有特性。那么我们是否可以利用它们的特性来满足这个需求呢?我们可以创建并初始化一个辅助的通道,并利用它模拟出操作超时的行为。请看下面的代码:

  1. timeout := make(chan bool, 1)
  2. go func() {
  3. time.Sleep(time.Millisecond)
  4. timeout <- false
  5. }()

我们声明并初始化了通道timeout,并把它作为超时触发器使用。之所以timeout的容量为1,是因为在超时的触发和实施方面都没有并发的需要。另外,使用单独的Goroutine来进行触发超时的操作是理所应当的,只有这样才不会影响主流程的执行。这个Goroutine在被运行之后,会先延迟1毫秒的时间,然后触发超时。这里所说的触发超时实际上就是给timeout通道发送一个元素值。那么,这个元素值是怎样让超时得以实施的呢?请看下面的代码:

  1. go func() {
  2. var e int
  3. ok := true
  4. for {
  5. select {
  6. case e, ok = <-ch11:
  7. if !ok {
  8. fmt.Println("End.")
  9. break
  10. } else {
  11. fmt.Printf("%d\n", e)
  12. }
  13. case ok = <-timeout:
  14. fmt.Println("Timeout.")
  15. break
  16. }
  17. if !ok {
  18. break
  19. }
  20. }
  21. }()

这段代码由实现前一个需求的代码修改而来。我们只是在select语句中新增了一个case。这个case的作用就是接收“超时信号”并执行超时子流程。在for循环进行期间,一旦通道timeout中有了新的元素值,第二个case几乎马上就会被执行。因为前一段代码会向timeout通道发送false,所以我们可以在这里直接将这个值赋给变量ok以表明超时已被触发、当前流程应被中断。该case会使用break语句结束当前的select语句的执行。又由于这时的变量ok已被赋值为false,所以这条for语句也会被结束执行。

至此,操作超时已经被成功的模拟出来了,它会运作良好的。在这行这段代码之后,标准输出上会出现这样的内容:

  1. .
  2. .
  3. .
  4. 47
  5. 48
  6. 49
  7. Timeout.

有些读者可能会说这模拟的不是真正的操作超时。是的,我在这里只是提供了一种可以灵活地让这个元素值接收流程结束的方法。确切地讲,这应该叫作流程执行超时。那么,对于每一个接收操作而言,实施单个操作的超时是否真的可行呢?

答案当然是肯定的。最简单的方式就是在每次迭代的开始都初始化一次timeout并在一个单独的Goroutine中布置好延时和“超时信号”的触发,就像下面这样:

  1. go func() {
  2. // 省略若干条语句
  3. for {
  4. timeout = make(chan bool, 1)
  5. go func() {
  6. time.Sleep(time.Millisecond)
  7. timeout <- false
  8. }()
  9. select {
  10. // 省略若干条语句
  11. }
  12. // 省略若干条语句
  13. }
  14. }()

这的确可以实现单个操作的超时。但是那个超时触发器开始计时的时间是不是有点儿早呢?如果这样是不是更好呢:

  1. go func() {
  2. var e int
  3. ok := true
  4. for {
  5. select {
  6. case e, ok = <-ch11:
  7. // 省略若干条语句
  8. case ok = <-func() chan bool {
  9. timeout := make(chan bool, 1)
  10. go func() {
  11. time.Sleep(time.Millisecond)
  12. timeout <- false
  13. }()
  14. return timeout
  15. }():
  16. fmt.Println("Timeout.")
  17. break
  18. }
  19. if !ok {
  20. break
  21. }
  22. }
  23. }()

在上面这段代码中,select语句的第二个case后的接收语句是关键。注意,这条接收语句中的通道表达式是由一个针对匿名函数的调用表达式(在这个case中的接收操作符<-和冒号“:”之间的那一段代码)代表的。该匿名函数使用在其中声明并初始化的timeout通道作为它的结果,并作为当前case语句中的接收语句的组成部分。在经过大约1毫秒的时间后,该接收语句会从timeout通道接收到一个元素值并把它赋给变量ok,从而恰当地执行了针对单个操作的超时子流程,这会适时地使当前的for语句被结束执行。

在这一实现中,我们利用了select语句的两个特性。

  • 在运行时系统开始执行select语句的时候,会先对它所有的case中的元素表达式和通道表达式进行求值。这样才使得在运行时系统选择要执行的case之前先制造出一个可用的超时触发器成为了可能。更具体地讲,在这些case被选择之前,第二个case后的接收语句会由下面这行代码替代:
  1. ok = <-timeout

这与前面的版本如出一辙。只不过这里的timeout通道是在每次开始执行select语句的时候才被声明并初始化出来的。

  • 运行时系统在选择select语句的case的时候,只要case有多个,它就肯定不会为某一个case而等待。只有当所有的case后的发送语句或接收语句都无法被立即执行的时候,它才会阻塞住当前的Goroutine。当然,前提是没有default case。在等待期间,只要发现有某一个case后的语句可以被立即执行,那么运行时系统就会立即执行这个case。在本例中,当无法立即从ch11通道中接收元素值的时候,运行时系统会随即判断是否可以立即接收timeout通道中的元素值。因此,一旦第一个case中的接收操作无法在1毫秒之内完成,我们给定的超时子流程就会被执行。

通过上面这一系列的示例和讲解,读者应该对select语句的编写、执行和惯用法都有所了解。注意,这其中的(以及之前的)很多内容都是只针对缓冲通道的。与非缓冲通道相关的各种使用方法和技巧,我们将会在下一小节予以揭晓。

7.2.5 非缓冲的Channel

如果我们在初始化一个通道的时候将其容量设置成0,或者直接忽略对容量的设置,那么就会使该通道称为一个非缓冲通道。与以异步的方式传递元素值的缓冲通道不同,非缓冲通道只能同步的传递我们发送给它的元素值。

1. Happens before

非缓冲通道这种同步传递元素值的特性是怎样实现的呢?这还要从与它有关的“happens before”原则说起。与缓冲通道相比,针对非缓冲通道的“happens before”原则有3个特别之处,具体如下。

  • 向此类通道发送元素值的操作会被阻塞,直到至少有一个针对该通道的接收操作开始进行为止。

  • 从此类通道接收元素值的操作会被阻塞,直到至少有一个针对该通道的发送操作开始进行为止。

  • 针对非缓冲通道的接收操作会在与之相对应的发送操作完成之前完成。

前两条规则保证了只有在针对同一通道的发送操作和接收操作都已经开始进行的时候,通过该通道的元素值传递才能够真正的开始。这也是“同步的传递”的真正含义。这两类操作哪一个先进行并不重要,重要的是它们要有配对的机会。只有这样传递动作才能有机会执行。

在进行针对非缓冲通道的发送操作之时,运行时系统会检查是否有针对同一个通道的接收操作正在进行。如果有,那么该接收操作必定正在被阻塞。这时,上述发送操作会唤醒第一个为此而被阻塞的接收操作,并与之配合完成一次元素值的传递动作。反之亦然。

对于第三条规则,我们需要特别注意。它是使非缓冲通道表现得与缓冲通道截然不同的最重要的一点。在缓冲通道中,由于元素值的传递是异步的,所以发送操作在成功向通道发送元素值之后就会立即结束。它不会关心是否有接收操作要接收该元素值,更别提该元素值是否已被成功接收的这个结果了。然而,针对非缓冲通道的操作在这方面的表现正好相反。发送操作在向非缓冲通道发送元素值的时候,会等待能够接收该元素值的那个接收操作。并且,只有确保该元素值被成功接收,它才会真正的完成执行。这进一步印证了非缓冲通道总会进行“同步的传递”的这一特性。

我们可以利用非缓冲通道的这种特性,实现多个Goroutine之间的同步。请看下面的示例:

  1. func main() {
  2. unbufChan := make(chan int)
  3. go func() {
  4. fmt.Println("Sleep a second...")
  5. time.Sleep(time.Second)
  6. num := <-unbufChan
  7. fmt.Printf("Received a integer %d.\n", num)
  8. }()
  9. num := 1
  10. fmt.Printf("Send integer %d...\n", num)
  11. unbufChan <- num
  12. fmt.Println("Done.")
  13. }

这段代码中共有4条打印语句。它们会以怎样的顺序打印相应的内容呢?我们让在主Goroutine中启用的那个Goroutine在运行之初先“睡”上1秒钟。原因是我们想看看针对unbufChan通道的发送操作是否真的会等待一个能够与之配对的接收操作。如果答案是否定的,那么很可能还没等那个新被启用的Goroutine真正开始运行,整个程序就已经被运行结束了。

在执行了该main函数之后,我们发现标准输出上的内容是这样的:

  1. Send integer 1...
  2. Sleep a second...
  3. Received a integer 1.
  4. Done.

这4行内容实际上体现了Goroutine和非缓冲通道的几个重要特性。先看前两行。第一行内容所对应的打印语句出现在go语句之后,但不是紧挨着这条go语句的那条语句。而第二行内容所对应的则是新启用的Goroutine中的第一条语句。请注意它们的位置关系。这说明新的Goroutine从启用到运行是需要一定的时间的。虽然这个时间很短暂,但是它也足够运行时系统执行主Goroutine中的好几条语句的了。在第二行内容被打印出来之后,运行时系统会继续运行主Goroutine中剩下的语句。如果针对unbufChan通道的发送操作不会被阻塞,那么运行时系统会在执行完这最后两条语句之后直接结束当前程序的运行。如果是这样的话,第三行甚至第二行的内容就应该是Done.。但是,实际情况并不是这样。这恰恰说明了主Goroutine中的发送操作在等待一个能够与之配对的接收操作。当那个新的Goroutine中的接收操作开始进行的时候,由于配对成功,元素值1才得以经由unbufChan通道被从主Goroutine传递至那个新的Goroutine。

我们再来看最后两行打印内容。如果unbufChan通道是带缓冲的,那么第四行内容Done.一定会先被打印出来。至于原因,我们在前面已经说明过了。但是,对于这里的非缓冲通道unbufChan来说,由于发送操作一定会等到相对应的接收操作完成之后才完成,所以这两行打印内容才会以这样的顺序展现出来。

通过前面这个示例,我们应该已经对非缓冲通道的“happens before”原则有真正的理解了。请记住它在这方面与缓冲通道的不同。

2. 单向的非缓冲通道

单向的非缓冲通道在表现形式上与我们在7.2.2节中讲述的单向通道并没有什么不同。因为在通道的类型字面量中并不会体现出通道是否带有缓冲。不过,正因为这一点,我们才更应该关注作为参数值被传入的单向通道的特性。请看下面的示例:

  1. func fecthPerson(origs chan<- Person) {
  2. origsCap := cap(origs)
  3. buffered := origsCap > 0
  4. goTicketTotal := origsCap / 2
  5. goTicket := initGoTicket(goTicketTotal)
  6. go func() {
  7. for {
  8. p, ok := fecthPerson1()
  9. if !ok {
  10. for {
  11. if !buffered || len(goTicket) == goTicketTotal {
  12. break
  13. }
  14. time.Sleep(time.Nanosecond)
  15. }
  16. fmt.Println("All the information has been fetched.")
  17. close(origs)
  18. break
  19. }
  20. if buffered {
  21. <-goTicket
  22. go func() {
  23. origs <- p
  24. goTicket <- 1
  25. }()
  26. } else {
  27. origs <- p
  28. }
  29. }
  30. }()
  31. }

上面展示了我们在7.2节中提及但并未展现的fecthPerson函数的实现。这个函数的功能是从某处取出欲处理的人员信息,并把它们发送给origs通道。我们在这里并不关心从哪里取出这些人员信息,所以这里用fecthPerson1函数代表人员信息取出操作。为了尽快地将欲处理的人员信息发送给origs通道,我们试图启用更多的Goroutine来并发地进行发送操作。顺便提一句,在无法确定这种并发是否会给程序性能带来正面影响的时候,我们常常会先以串行的方式进行相应的操作。

我们怎样确定并发的进行发送操作是否会给程序性能带来正面影响呢?请注意在fecthPerson函数体的开始处声明的那个变量buffered。它的值表示了通道origs是否带有缓冲。如果buffered变量的值是false,那么我们就没有必要并发地发送取出的人员信息。其根本原因是,非缓冲通道只能同步地传递元素值。在接收操作完成之前,发送操作是无法完成的。即使我们采用并发的方式发送人员信息,这些信息也只会被手递手地传送到处理它们的Goroutine中。对于非缓冲通道来讲,在接收方并没有并发地进行接收操作的时候,发送方并发地进行发送操作是没有任何意义的。这种方式反而会降低程序的性能,并给运行时系统带来无谓的负担。更进一步地说,在收发两端都有并发需求的情况下,使用非缓冲通道作为元素值传输介质是不合适的。除非双方都有着非常强烈的同步传递的需要。但是,在一般情况下,这两种需求是相悖的。

我们再来关注上述fecthPerson函数中其他值得说明的代码。在我们启用新的Goroutine以向origs通道发送人员信息的时候,还涉及了对goTicket通道的接收操作和发送操作。那么,goTicket通道又是做什么用的呢?goTicket通道实际上是我们为了限制该程序启用的Goroutine的数量而声明的一个缓冲通道。由于这种用法不在本小节的讨论范围之内,所以我们在这里只做简要的介绍。在初始化goTicket通道的时候,我们会向它发送与其容量相等的元素值。该通道的容量(在这里与它的长度相等)代表了我们可以启用的Goroutine的数量。每当我们要启用一个Goroutine的时候,就从该通道中接收一个元素值,以表示可被启用的Goroutine减少了一个。相应地,每当一个被启用的Goroutine的运行即将结束的时候,我们就应该向该通道发送一个元素值,以表示可被启用的Goroutine增加了一个。当相应的Goroutine的数量已经与该通道的容量相等(即该通道中已不存在任何元素值)的时候,新的启用Goroutine的动作(确切地说,是紧挨在go语句之前的针对该通道的接收操作)就会被组塞住。仅当该通道中又存在新的元素值(即之前启用的一些Goroutine即将被运行结束)的时候,新的启用Goroutine的动作才会得以继续。这是使用缓冲通道作为Goroutine票池的典型做法。这里所说的Goroutine票,其实代表了为了启用一个Goroutine而必须持有的一种令牌。我们使用这样的令牌来限制程序启用Goroutine的数量。

另一方面,既然我们需要并发的向origs通道人员信息,那么就应该在保证安全的情况下再关闭origs通道。也就是说,我们应该在发现已无可取的人员信息之后,检查被启用的相关Goroutine是否都已运行完毕。检查的具体方法就是查看goTicket通道长度是否与其容量相等。如果相等,那么就说明goTicket中的令牌都已被放回,所有相关的Goroutine都已经运行完毕。只有在确定这一点之后,我们才应该去关闭origs通道。不过,如果origs通道是非缓冲的(即变量buffered的值为false),那么我们就没必要做上述检查了。具体请参看fecthPerson函数中那条处于内层的for语句。

综上所述,单向的非缓冲通道的重点不在单向而依然在非缓冲。在函数或方法接受外部传来的通道类型的参数值的时候,应该先验明它的种类再依此来决定后续的操作。也正因为通道的这种特性不会被反映到类型上,所以我们只能使用cap函数作为辅助。

3. for语句与非缓冲通道

使用for语句接收通道中的元素值的时候,并不需要关注通道是否带有缓冲区。因为这种用法仅涉及接收操作。无论是非缓冲通道还是缓冲通道,对它们的接收操作都会被阻塞,直至通道中有元素值可接收为止。唯一的不同是,针对非缓冲通道的接收操作会在相应的发送操作完成之前完成,而这对于缓冲通道来说恰恰相反。

4. select语句与非缓冲通道

在使用select语句向某个非缓冲通道发送元素值的时候,我们应该特别注意。因为,与操作缓冲通道的select语句相比,它被阻塞的概率一般会大很多。其根本原因依然是非缓冲通道会以同步的方式传递元素值。如果运行时系统在该条select语句中选择要执行的case的时候不存在正在为此而等待的可配对操作,相应的case就肯定不会被选中。因为这样的case无法被立即执行。例如:

  1. unbufChan := make(chan int)
  2. select {
  3. case unbufChan <- i:
  4. case unbufChan <- i + 10:
  5. }

在执行上面的select语句的时候,它所在的Goroutine会被阻塞,直到在其他Goroutine中进行了对应的接收操作,如:

  1. <-unbufChan

记住,只有存在可配对的操作的时候,传递元素值的动作才可能真正开始。当然,我们为这样的select语句添加default case可以使它不被阻塞。但是这样并不会减小其中的case不被选中的概率。相比之下,这里的default case可能会被经常选中并执行。我们下面来看一个比较完整的示例:

  1. func main() {
  2. unbufChan := make(chan int)
  3. sign := make(chan byte, 2)
  4. go func() {
  5. for i := 0; i < 10; i++ {
  6. select {
  7. case unbufChan <- i:
  8. case unbufChan <- i + 10:
  9. default:
  10. fmt.Println("default!")
  11. }
  12. time.Sleep(time.Second)
  13. }
  14. close(unbufChan)
  15. fmt.Println("The channel is closed.")
  16. sign <- 0
  17. }()
  18. go func() {
  19. loop:
  20. for {
  21. select {
  22. case e, ok := <-unbufChan:
  23. if !ok {
  24. fmt.Println("Closed channel.")
  25. break loop
  26. }
  27. fmt.Printf("e: %d\n", e)
  28. time.Sleep(2 * time.Second)
  29. }
  30. }
  31. sign <- 1
  32. }()
  33. <-sign
  34. <-sign
  35. }

在上面的main函数中,我们启用了两个Goroutine分别对非缓冲通道unbufChan进行发送和接收的操作。发送操作会被进行10次,间隔时间为1秒。接收操作会一直被尝试,尝试的间隔至少为2秒。显然,收发操作可配对的情况每2秒会出现一次。执行main函数之后,打印内容如下:

  1. default!
  2. e: 11
  3. default!
  4. e: 13
  5. default!
  6. e: 5
  7. default!
  8. e: 17
  9. default!
  10. e: 9
  11. The channel is closed.
  12. Closed channel.

正如我们所料,default case会在收发操作无法配对的情况下被选中并执行。在这里,它被选中的概率是50%。

除此之外,我们故意在第一条select语句的两个case中分别向unbufChan通道发送小于10和大于等于10的整数。这使得我们可以很容易地从打印内容中分辨出每当收发操作可配对的时候哪一个case被选中了。实际上,这与在select语句中向缓冲通道发送元素值的情形是一样的。当可配对的情况出现的时候,这两个case是被随机选择的。运行时系统会先检查都有哪些case可以被立即执行,然后在通过伪随机算法选择它们中的一个。

总之,非缓冲通道是无法缓冲任何元素值的。因此,针对它们的收发操作能否被立即执行完全取决于当时是否有可配对的操作。我们在select语句中使用非缓冲通道的时候应该把这种特性作为重要的参考。例如,上面的这个示例给予了我们这样一个启发:使用非缓冲通道能够让我们非常方便地在接收端对发送端的操作频率实施控制。读者可以试着把上面main函数中的第一条select语句的default case去掉并再次执行main函数。看看它在执行效果上(注意各行内容被打印的时间以及间隔)与之前有什么不同,然后尝试解释这种不同。

非缓冲通道最重要的特性都体现在与之相关的“happens before”原则中。读者应该记住我们在本小节强调的那些与缓冲通道的“happens before”原则不同的部分。在forselect语句中,使用非缓冲通道与使用缓冲通道在形式上面并没有什么不同。但是,需要注意的是,非缓冲通道的特性让我们在使用它们的时候不得不留意操作配对以及由此带来的一些问题。

再次强调,在发送端每次都需要确保元素值已被接收的情况下,使用非缓冲通道是适合的。否则,我们应该选用缓冲通道来实现相关流程的异步化,并以此提高整个程序的性能。

7.2.6 time包与Channel

本小节主要讲解Go语言的标准库代码包time中的一些API的使用。为什么要在这里讲它们?因为,它们都是用通道来实现的。并且,它们还能够帮助我们对针对通道的收发操作进行更有效的控制。

1. 定时器

首先,我们先来看看time包中的结构体类型Timer。顾名思义,该类型的结构体会被作为定时器使用。我们不应该直接使用复合字面量来初始化该类型的变量。因为time.Timer类型中包含了一个包级私有的字段。并且,该字段是需要被显式地初始化的。这使得我们无法使用复合字面量正确的初始化一个time.Timer类型的值。

time包中,有两个函数可以帮助我们初始化time.Timer类型的值。它们是time.NewTimer函数和time.AfterFunc函数。

函数time.NewTimer的使用非常简单。我们在调用它的时候只传给它一个time.Duration类型的值就可以了。该函数的唯一参数的含义是,自定时器被初始化的那一刻起,距其到期时间需要多少纳秒。虽然这里的单位是纳秒,但是我们可以很方便地拼出我们需要的时间。因为time包已经包含了很多我们常会用到的time.Duration类型的常量。比如,我们可以这样表示3小时36分钟:

  1. 3*time.Hour + 36*time.Minute

如果我们要声明并初始化一个到期时间距此时的间隔为3小时36分钟的定时器的话,就应该这样编写代码:

  1. t := time.NewTimer(3*time.Hour + 36*time.Minute)

注意,这里的变量t*time.NewTimer类型的,而不是time.NewTimer类型的。这个time.NewTimer类型的指针类型的方法集合包含了两个方法,即Reset方法和Stop方法。Reset方法被用于重置定时器。也就是说,我们初始化的定时器是可以被复用的。该方法会返回一个bool类型的值。Stop方法被用来停止定时器。该方法也会使用一个bool类型值作为它的结果。如果该结果为false,就说明该定时器在之前已经到期或者已经被停止了。除了这两种情况,该结果都应该为trueReset方法对其结果值的设定策略与Stop方法如出一辙。不过,Reset方法的返回值与当次重置操作是否成功无关。换句话说,无论怎样,一旦Reset方法被执行结束,就说明该定时器已被重置。与此不同,如果Stop方法的返回值是false,那么就说明此次调用是无效的,即它并没有对定时器现有的状态产生任何改变。

我们刚刚说到了定时器的到期。那么这个到期的事件是怎样被传达的呢?实际上,这个传达的途径就是通道,即time.NewTimer类型的字段CC是一个chan time.Time类型的缓冲通道。这意味着,一旦触及到期时间,定时器就会向自己的C字段发送一个time.Time类型的元素值。这个元素值代表了该定时器的绝对到期时间。与之对应,我们在调用time.NewTimer函数时传入的那个time.Duration类型值就是该定时器的相对到期时间。这两者之间的关系一定是:

  1. <初始化时的绝对时间> + <相对到期时间> == <绝对到期时间>

通过定时器的字段C,我们可以及时得知定时器到期的这个事件的来临,并对此做出响应。例如,有这样一段代码:

  1. t := time.NewTimer(2 * time.Second)
  2. now := time.Now()
  3. fmt.Printf("Now time: %v.\n", now)
  4. expire := <-t.C
  5. fmt.Printf("Expiration time: %v.\n", expire)

它被执行之后,标准输出上会被打印出如下内容:

  1. Now time: 2014-04-01 16:00:10.2329909 +0800 +0800.
  2. Expiration time: 2014-04-01 16:00:12.2331053 +0800 +0800.

可以看到,即使我们在初始化定时器之后马上获取了当前时间,它与定时器被初始化的时间还是有少许偏差的。这个偏差在作者的机器上是微秒级的。不过,这个示例也已经能够证实前述的那3个时间的关系了。

当然,在实际的场景中,我们不会如此简单地使用定时器。现在,我们就来看看它的典型使用方式。还记得我们在7.2.4节的最后实现的那个超时触发器吗?它实际上就是一个简易版本的定时器。现在我们就用官方的定时器来替换我们自己编写的那个版本。不过,为了方便对比,我们先重现一下之前的简易版本定时器以及相关的case

  1. case ok = <-func() chan bool {
  2. timeout := make(chan bool, 1)
  3. go func() {
  4. time.Sleep(time.Millisecond)
  5. timeout <- false
  6. }()
  7. return timeout
  8. }():
  9. fmt.Println("Timeout.")
  10. break

下面,是使用官方的定时器的版本:

  1. case <-time.NewTimer(time.Millisecond).C:
  2. fmt.Println("Timeout.")
  3. ok = false
  4. break

在这两个版本中,case之后的通道表达式的作用是类似的。它们都是先声明并初始化一个通道,做好定时向该通道发送元素值的设定,最后将这个通道作为其结果值。不过,后者显然让我们省去了很多的代码。

其实,我们在上面展示的表达式:

  1. time.NewTimer(time.Millisecond).C

与我们调用time包的After函数的效果相当。如用time.After函数对此进行等价替换的话,那个通道表达式会是这样:

  1. time.After(time.Millisecond)

实际上,time.After函数的实现代码正如前者。它只能算是一个为我们屏蔽了time.Timer类型的内部实现的快捷函数而已。

正如前文所述,定时器是可以被复用的。因此,像前面的示例那样在case后的接收语句中初始化定时器难免有些浪费。下面的使用定时器的方式会更加节省资源:

  1. go func() {
  2. // 省略若干条语句
  3. var timer *time.Timer
  4. for {
  5. select {
  6. // 省略若干条语句
  7. case <-func() <-chan time.Time {
  8. if timer == nil {
  9. timer = time.NewTimer(time.Millisecond)
  10. } else {
  11. timer.Reset(time.Millisecond)
  12. }
  13. return timer.C
  14. }():
  15. fmt.Println("Timeout.")
  16. ok = false
  17. break
  18. }
  19. // 省略若干条语句
  20. }
  21. }()

可以看到,我们在for语句的前面先声明了一个*time.Timer类型的值,然后在相关case之后声明的匿名函数中尽可能地复用它。虽然这比每次执行select语句时都初始化一个定时器的那种方式要啰嗦一些,但是在for循环的迭代次数很多的情况下,这样做还是很有必要的。虽然这比最初的版本使用的代码还要多一点点,但是它绝对不会浪费资源。

定时器永远不需要向它的C字段发送第二个元素值。这是因为,定时器一旦到期就意味着在我们重置它之前它无法被再次使用。正因为如此,C的容量仅为1就足够了。注意,如果我们在定时器到期之前停止了它,那么该定时器的字段C也就没有机会缓冲任何元素值了。更具体地讲,若调用定时器的Stop方法的结果值为true,那么在这之后再去试图从它的C字段中接收元素是不会有任何结果的。更重要的是,这样做还会使当前Goroutine永远被阻塞!再次强调,重置已被停止的定时器是使它恢复如初的唯一方法。

另一方面,如果定时器到期了,但由于某种原因我们未能及时从它的字段C中接收那个元素值,那么就意味着我们错过了处理到期事件的时机。不过,在一些场景下,这种情况常常会发生,同时它也是正常的。前面的示例展示的就是这样的场景。定时器到期但未被处理就意味着其他case被执行了。这正是我们所期望的。但是,在这种情况下。定时器的字段C中会一直缓冲着那个唯一的元素值,即使在该定时器被重置之后也会是这样。因此,我们总是应该及时从定时器的字段C中接收元素值,否则会影响对定时器的复用。

另外,还有一点需要注意,那就是:我们在设定定时器的相对到期时间的时候,一定要让这个纳秒数为一个正整数。否则,定时器在被初始化或重置之时就会立即到期。在这之后,当我们试图从它的字段C接收元素值的时候,就会立即得到它而不会有任何延时。显然,这样的定时器就失去了存在的意义了。

除了上述讲到的API之外,time包还为我们提供了定时器的另一种使用方式。这种使用方式使得定时器在到期事件到来之时不向其C字段发送代表绝对到期时间的元素值,而是执行我们指定的函数。如果说前者是定时器默认的处理到期事件的方法的话,那么后者就可以被称作自定义方法。示例代码如下:

  1. var t *time.Timer
  2. f := func() {
  3. fmt.Printf("Expiration time: %v.\n", time.Now())
  4. fmt.Printf("C's len: %d\n", len(t.C))
  5. }
  6. t = time.AfterFunc(1*time.Second, f)
  7. time.Sleep(2 * time.Second)

我们通过调用time.AfterFunc函数初始化一个定时器。time.AfterFunc函数需要两个参数。第一个参数是定时器的相对到期时间,而第二个参数就是一个我们自定义的函数。该函数可以由任何无参数声明且无结果声明的函数或方法表示。在这样的函数或方法中,我们可以做的事情就很多了。这显然比先尝试从定时器的C字段中接收元素值然后再为此做出响应要方便得多。在执行上面这段代码之后,如下内容会被打印到标准输出:

  1. Expiration time: 2014-04-02 11:46:27.9564585 +0800 +0800.
  2. C's len: 0

我们在这段代码的最后让当前Goroutine“睡眠”了2秒钟以确保打印内容的完整。这样做的原因是,time.AfterFunc的调用不会被阻塞。它会以异步的方式在到期事件来临的那一刻执行我们自定义的函数。正如第二行内容显示的那样,定时器的字段C并没有缓冲任何元素值。这也说明了,在给定了自定义函数之后,默认的处理方法(向C发送代表绝对到期时间的元素值)就不会被执行了。除了这个特点之外,我们操纵使用time.AfterFunc函数初始化的定时器并不会有什么特别。它的Reset方法和Stop方法的行为及其结果值的设定策略都不会因此而改变。

我们常常把定时器作为超时触发器使用。从某种角度看,它的作用与time.Sleep函数有些类似。但是,定时器的一个明显的优势是,由于它使用缓冲通道作为告知到期事件的途径,使得我们可以异步地使用它。虽然使用time.Sleep函数和一些附加代码也可以做到这一点,但是这样显然会导致更多的代码和资源的浪费。正如我们在前面的示例中展现的那样。另外,通过time.AfterFunc函数初始化定时器,可以让我们更加灵活地使用它。这种初始化方式适合被用来执行定时任务。只要我们把它与定时器的Reset方法结合起来使用,就可以满足每隔一段时间执行一次任务的需求了。

2. 断续器

结构体类型time.Ticker表示了断续器的静态结构。所谓断续器,就是周期性的传达到期事件的装置。这种装置的行为方式与仅有秒针的钟表有些类似,只不过断续器的传动间隔时间可以不是1秒钟。另外,断续器与定时器也是相似的。但是,定时器在被重置之前只会传达一次到期事件,而断续器会持续工作直到被停止。

断续器传达到期事件的途径也是它的字段C,而C的类型也是元素类型为time.Time的通道类型。每隔一个指定的时间,断续器就会向此通道发送一个代表了当次的绝对到期时间的元素值。值得注意的是,断续器的字段C的容量依然是1。如果断续器在向它的字段C发送新的元素值的时候发现旧值还未被接收,那么就会取消当次的发送操作。这相当于丢弃了当次的到期事件而不予以传达。更确切地说,后续的发送操作的成功与否都由断续器的字段C中的可用缓冲空间来决定。

现在,我们来看看怎样初始化一个断续器,代码如下:

  1. var ticker *time.Ticker = time.NewTicker(time.Second)

初始化断续器的函数的名称依然以单词“New”开始。time.NewTicker函数接受一个time.Duration类型的参数。该参数代表了我们前面所说的周期,单位是纳秒。上面这段代码初始化了一个传达到期事件的间隔时间为1秒的断续器,并把它赋给了一个*time.Ticker类型的变量。该类型的方法集合中只有一个方法,即Stop方法。它的功能是停止断续器。这与定时器的Stop方法的功能是完全相同的。一旦断续器被停止,它就不会再向其C字段发送任何元素值了。但如果当时此C字段中已经有了一个元素值,那么该元素值会一直在那里,直至被接收。

断续器与定时器的适用场景是完全不同的,把它当作超时触发器来使用是不适合的。因为它对到期事件的传达虽然可能被放弃,但绝不会被延误。也就是说,断续器一旦被初始化,它所有的到期时间就都是固定的了。而超时触发器则需要依据每次具体操作(初始化或重置)的开始时间来决定绝对的超时时间。这也是二者的重要区别之一。

固定不变的到期时间恰恰使断续器非常适合被作为定时任务的触发器。例如,在一个定时的执行数据修补任务的程序中,为了避免对其他正常的数据库操作产生影响,我们要求两次任务执行之间的最短间隔时间为10分钟。我们可以这样编写满足这一需求的代码:

  1. var ticker *time.Ticker = time.NewTicker(10 * time.Minute)
  2. ticks := ticker.C
  3. // 省略若干条语句
  4. go func() {
  5. for _ = range ticks {
  6. if !patch() {
  7. break
  8. }
  9. _, ok := <-ticks
  10. if !ok {
  11. break
  12. }
  13. }
  14. }()

这段代码中的函数patch就代表了数据修补任务。无论执行该任务的耗时是怎样的,从前一次任务执行完成到后一次任务执行开始的间隔时间总会大于10分钟。因为我们在那个for循环中额外添加了一条接收语句。这条接收语句的执行会增加下一次迭代的执行延时。由于for语句在准备执行下一次迭代之前会等待两个到期事件,所以执行数据修补任务的间隔至少是一个周期(即10分钟)。

有时候,我们只需要断续器的C字段而不需要调用它的Stop函数。在这种情况下,我们就可以使用time包为我们提供的一个快捷函数来初始化一个断续器。这个快捷函数就是time.Tick函数。time.Tick函数接受一个代表到期事件传达周期的参数,并会返回刚被初始化完成的那个断续器的字段C作为结果。示例如下:

  1. ticks := time.Tick(time.Second)

代码包time中的定时器(time.Timer)和断续器(time.Ticker)都充分利用了缓冲通道的异步特性来传达到期事件。我们可以使用它们对程序的流程进行更加灵活的控制。不过,它们对应着不同的流程控制策略。我们可以利用定时器设定某一个操作或任务的超时时间。这相当于对它们的完成时间点进行控制,而断续器常被我们用来设定操作或任务的开始时间点。从这个角度看,它们面向的是两个看似对立又相互关联的方面。通过对它们的组合使用,我们可以实现对时间敏感的流程的有效控制。