9.5 中间件的实现

在编写了网络爬虫框架的基本数据结构,以及设计了构建它所需的所有组件的接口之后,我们就可以开始动手实现它了。与接口设计的顺序正好相反,我们会从最基础的组件(即各种中间件工具)开始写起。下面我们马上开始。

注意 本节描述的部分实现会在9.8节中被修改。因此,它们会与随书项目中的相应代码不一致。

9.5.1 通道管理器

通道管理器会对网络爬虫框架中所涉及的所有通道进行统一的管理。这里所说的管理包括初始化、热替换和关闭等。我们在设计其接口类型的时候,已经知悉了初始化和关闭这两个功能,但是却没有提及过热替换功能。实际上,热替换功能正是设计这个通道管理器的重要目的之一。

1. 热替换

我们在这里所说的热替换的作用是,在替换被用于传输某类数据的通道的时候,使用该通道的各方不会受到丝毫影响。通过这样的隔离方式,调度器就不用担心因通道替换而带来的一些问题了。

为了让读者能够更加清楚地看到这些问题。我们先来进行一个试验。请想象这样一个场景:我们使用两个Goroutine分别对同一个通道进行发送操作和接收操作。首先,我们需要先声明几个变量:

  1. var chan1 chan int
  2. var chanLength int = 18
  3. var interval time.Duration = 1500 * time.Millisecond

变量chan1代表的是作为操作目标的通道。由于需要多次初始化该通道,所以我们使用chanLength变量来明确它的长度。在这里,我们把这个长度值设定为18。而第三个变量interval表示的则是针对通道chan1的发送操作和接收操作的进行间隔。设置这样一个进行间隔的目的是让我们运行这个试验程序的时候更容易看清其过程。

好了,我们现在把上面的那3行代码放入到一个命令源码文件中并开始编写它的main函数。在main函数中,我们需要做3件事。第一件事是初始化通道chan1

  1. chan1 = make(chan int, chanLength)

这也是对该通道的第一次初始化。第二件事,启用一个Goroutine专门向该通道发送元素。请看下面的代码:

  1. go func() {
  2. for i := 0; i < chanLength; i++ {
  3. if i > 0 && i%3 == 0 {
  4. fmt.Println("Reset chan1...")
  5. chan1 = make(chan int, chanLength)
  6. }
  7. fmt.Printf("Send element %d...\n", i)
  8. chan1 <- i
  9. time.Sleep(interval)
  10. }
  11. fmt.Println("Close chan1...")
  12. close(chan1)
  13. }()

在这个专用的Goroutine中,我们会每隔1.5秒向chan1通道发送一个元素值。发送的总数量与该通道的长度值相同。在每次发送之前,我们还会判断当次发送的是第几个元素值。如果符合条件,我们就会先对chan1进行重新初始化。读者不用太在意这个重新初始化的条件。这是我们为了让重新初始化操作的进行更加有规律而做的设定,并没有其他含义。最后,在所有的发送操作都完成之后,我们还会关闭通道chan1。注意,为了能够清楚地看到程序的运行的过程,我们在程序中适当地添加了一些打印语句。这对我们说清问题会很有帮助。

很显然,我们最初创建的通道值并不会被填满。因为,在第四次迭代之初(由重新初始化条件可知),变量chan1会与当前的值解除绑定,并且与新创建的通道值建立绑定。在这之后,发送语句chan1 <- i实际上是把i的值发送给了新创建的通道值,而我们最初初始化的那个通道值中的元素值的数量就永远不会再增加了。相似地,我们之后赋给chan1变量的每个通道值都最多只会包含3个元素值。这就是我们将要关注的通道值的替换动作。

为了展示上述替换动作所带来的影响,我们还需要定时地从chan1处接收元素值。负责此任务的代码被包含在了函数receive中。我们在main函数中要做的第三件事就是调用这个函数。它只接受一个参数。该参数的值即是chan1的值:

  1. receive(chan1)

函数receive的完整声明如下:

  1. func receive(chan2 chan int) {
  2. fmt.Println("Begin to receive elements from chan1...")
  3. timer := time.After(30 * time.Second)
  4. Loop:
  5. for {
  6. select {
  7. case e, ok := <-chan2:
  8. if !ok {
  9. fmt.Println("--Closed chan1.")
  10. break Loop
  11. }
  12. fmt.Printf("Received an element: %d\n", e)
  13. time.Sleep(interval)
  14. case <-timer:
  15. fmt.Println("Timeout!")
  16. break Loop
  17. }
  18. }
  19. fmt.Println("--End.")
  20. }

在这里,我们同样用到了for语句。不过,为了防止receive函数的执行永远无法结束,我们还用到了标记语句和select语句。在一开始,我们先利用time.After函数创建了一个定时器,超时时间为30秒。然后,我们试图定时且不断地从当前函数的参数chan2中接收元素值。与main函数中的发送操作相同,该接收操作的间隔时间也为1.5秒。注意,我们把该接收操作作为了select语句中的一个case表达式。而另一个case表达式则是针对定时器timer的接收操作。此select语句与外层的for语句和以Loop为标识的标记语句一起构成了带操作超时设置的被用于从通道chan2接收元素值的程序片段。

如此组合的含义是每隔1.5秒尝试从chan2处接收元素值并把它们打印到标准输出,但只要这些操作耗费的总时间超过了30秒就立刻放弃当前以及后续的所有尝试并结束for语句的执行。此外,如果chan2代表的通道被关闭,程序也会跳出for语句。这样一来,我们就有效地避免了因接收操作的阻塞而导致的程序执行停滞的问题。

在我们使用go run命令运行了当前的命令源码文件(无论包含前述代码的文件叫什么名字)之后,标准输出上会出现如下内容:

  1. Receive element from chan1...
  2. Send element 0...
  3. Received an element: 0
  4. Send element 1...
  5. Received an element: 1
  6. Send element 2...
  7. Received an element: 2
  8. Reset chan1...
  9. Send element 3...
  10. Send element 4...
  11. Send element 5...
  12. Reset chan1...
  13. Send element 6...
  14. Send element 7...
  15. Send element 8...
  16. Reset chan1...
  17. Send element 9...
  18. Send element 10...
  19. Send element 11...
  20. Reset chan1...
  21. Send element 12...
  22. Send element 13...
  23. Send element 14...
  24. Reset chan1...
  25. Send element 15...
  26. Send element 16...
  27. Send element 17...
  28. Close chan1...
  29. Timeout!
  30. --End.

我们先来关注最关键的部分。请注意这样的内容:Reset chan1…。它是在main函数中的通道类型变量chan1即将被重新初始化的时候被打印出来的。在该内容被第一次打印出来之前,每当针对通道chan1的发送操作被进行一次,都紧接着会有一行代表了针对该通道的接收操作已被执行的内容被打印出来,如下所示:

  1. Send element 2...
  2. Received an element: 2

这表明我们的程序正在按照我们的意图运行。一旦chan1通道中出现了新的元素值,receive函数中的程序就会立刻接收并打印它。

但是,当第一个Reset chan1…被打印出来之后,我们就只见发送、不见接收了,如:

  1. Reset chan1...
  2. Send element 3...
  3. Send element 4...
  4. Send element 5...
  5. Reset chan1...
  6. Send element 6...
  7. Send element 7...

而输出的最后3行

  1. Close chan1...
  2. Timeout!
  3. --End.

也表明,直到main函数的代码在发送完所有元素值之后关闭了通道chan1receive函数中的接收操作仍然没有被唤醒(注意,我们把接收表达式<-chan2的结果值赋给了两个变量,读者应该知道这意味着什么)。这是怎么回事呢?

正如我们在前面讲述的那样,一旦在main函数中的针对chan1的重新赋值操作完成,后续的发送操作就会把元素值发送到新的通道值中。而在receive函数中的接收操作却仍然针对的是之前的那个通道值。这样就使本该操作同一个通道值的两个操作不再配对了。接收操作针对的通道中不会再有新的元素值,而那个发送操作向另一个通道发送的元素值也不会被任何接收方接收。其根本原因是,对变量的重新赋值的效果是无法被传递的。当我们想传递这种改变的时候,上述程序是无法让我们如愿的。显然,这不是Go语言本身的问题,而是我们的编码方式的问题。

经过上述试验我们可以看出,针对原有通道的替换动作导致了相应的接收操作的异常,同时其结果也无法被传递。但是,我们的网络爬虫框架需要的恰恰是这种传递,因为被用来传输各类数据的通道可能会在爬取流程的执行过程中被替换。由于这个爬取流程是由网络爬虫框架中的众多组件共同参与执行的,所以必须要让对通道的替换动作瞬时被各个组件觉察到。

我们已经知道,热替换可以解决此类问题。这一设计技巧的灵感仍然来自于面向对象编程世界中的“面向接口编程”原则。该原则主张模块对外提供抽象的接口而不是具体的实现。

依照这个思路,我们需要对前面的代码进行一些改造。首先,我们需要声明一个被用来获取通道的函数:

  1. func getChan() chan int {
  2. return chan1
  3. }

这个函数就相当于我们为通道chan1的外部使用方提供的接口。然后,我们需要让receive函数中的代码获取该通道的途径由它的参数chan2转变为对函数getChan的调用。这只需要修改极少的代码:

  1. func receive() {
  2. fmt.Println("Receive element from chan1...")
  3. timer := time.After(30 * time.Second)
  4. Loop:
  5. for {
  6. select {
  7. case e, ok := <-getChan():
  8. // 省略若干条语句
  9. case <-timer:
  10. // 省略若干条语句
  11. }
  12. }
  13. fmt.Println("--End.")
  14. }

注意,只有select语句中的第一个case表达式被修改了。这意味着,select语句的每次执行都会伴随着对调用表达式getChan()的求值。而该调用表达式的求值结果永远会是通道chan1在当时的值。这就避免了因对chan1的重新初始化而导致的元素值无法被正确传递的问题。

通过这样的改造之后,我们再来运行这个命令源码文件,标准输出上会出现如下内容:

  1. Receive element from chan1...
  2. Send element 0...
  3. Receive a element: 0
  4. Send element 1...
  5. Receive a element: 1
  6. Send element 2...
  7. Receive a element: 2
  8. Reset chan1...
  9. Send element 3...
  10. Receive a element: 3
  11. Send element 4...
  12. Receive a element: 4
  13. Send element 5...
  14. Receive a element: 5
  15. Reset chan1...
  16. Send element 6...
  17. Receive a element: 6
  18. Send element 7...
  19. Receive a element: 7
  20. Send element 8...
  21. Receive a element: 8
  22. Reset chan1...
  23. Send element 9...
  24. Receive a element: 9
  25. Send element 10...
  26. Receive a element: 10
  27. Send element 11...
  28. Receive a element: 11
  29. Reset chan1...
  30. Send element 12...
  31. Receive a element: 12
  32. Send element 13...
  33. Receive a element: 13
  34. Send element 14...
  35. Receive a element: 14
  36. Reset chan1...
  37. Send element 15...
  38. Receive a element: 15
  39. Send element 16...
  40. Receive a element: 16
  41. Send element 17...
  42. Receive a element: 17
  43. Close chan1...
  44. --chan1 closed.
  45. --End.

这与我们在前面看到的输出内容有两个不同。

  • 无论main函数中的代码对chan1的重新初始化(由输出内容Reset chan1…代表)有多少次,receive函数中的代码都能够及时、正确地从通道中接收元素值。这从
  1. Send element 8...
  2. Receive a element: 8

  1. Send element 16...
  2. Receive a element: 16

等输出内容上就能看得出来。

  • main函数中的代码关闭chan1变量代表的通道的时候(由输出内容Close chan1…代表),receive函数中的代码立即就能觉察到并打印出—chan1 closed.

上述的两个不同表明,收发双方始终操作的是同一个通道。在任何时刻都没有出现操作不配对的情况。这完全得益于作为接口的函数getChan。它能为我们提供热替换功能。

当然,我们在网络爬虫框架中也可以运用这一设计技巧。通道管理器就是一个绝佳的应用场景。实际上,我们先前声明的通道管理器的接口ChannelManager中的方法ReqChanRespChanItemChanErrorChan就相当于分别针对于各类通道的接口,而这些通道的具体实现则被隐藏在了通道管理器的内部。需要使用它们的程序仅能通过这些接口来获得对应的通道实例。

据此,我们已经可以给出通道管理器实现的基本结构了:

  1. // 通道管理器的实现类型。
  2. type myChannelManager struct {
  3. channelLen uint // 通道的长度值。
  4. reqCh chan base.Request // 请求通道。
  5. respCh chan base.Response // 响应通道。
  6. itemCh chan base.Item // 条目通道。
  7. errorCh chan error // 错误通道。
  8. status ChannelManagerStatus // 通道管理器的状态。
  9. }

结构体类型myChannelManager中的这6个字段是与接口类型ChannelManager中的方法一一对应的。所以在这里我们也不需要再次解释。不过,这只是第一个版本。后面我们还会有所添加。

2. 初始化

一旦有了基本结构,我们需要考虑的一个很重要的方面就是对这些字段的初始化方式。对于myChannelManager来说,我们只给定通道的长度就足够了。我们可以编写出一个名为NewChannelManager的函数,并让其负责创建并初始化通道管理器。该函数只需接受一个代表了通道长度的参数值。记得吗?ChannelManager类型中包含了一个名为Init的方法。该方法被定义为初始化通道管理器的方法。因此,我们还应该在NewChannelManager函数中调用myChannelManager类型值的Init方法,以达到分清职责和重用初始化代码的目的。所谓分清职责是指,让Init方法全权负责对其所属值的初始化工作,而NewChannelManager函数仅负责校验调用方传来的参数值。这样做也可以把初始化通道管理器的代码集中于一处,以达到复用的目的。

好了,我们现在给出NewChannelManager函数的声明:

  1. // 创建通道管理器。
  2. // 如果参数channelLen的值为0,那么它会由默认值代替。
  3. func NewChannelManager(channelLen uint) ChannelManager {
  4. if channelLen == 0 {
  5. channelLen = defaultChanLen
  6. }
  7. chanman := &myChannelManager{}
  8. chanman.Init(channelLen, true)
  9. return chanman
  10. }

可以看到,该函数的函数体中用到了一个名为defaultChanLen的包级私有的全局变量。它代表了默认的通道长度。当然,它只在参数channelLen的值为0的时候才起作用,并充当即将被初始化的通道管理器中的通道长度。注意,chanman变量的类型是myChannelManager,而不是myChannelManager。这意味着我们想把myChannelManager类型作为ChannelManager接口类型的实现类型。

在校验了参数channelLen的值之后,新创建的通道管理器chanmanInit方法将被调用。下面我们来看看这个方法都应该做哪些事。

方法Init首先应该初始化chanman中的所有字段。有了通道长度channelLen,这项任务就很容易了。第一个版本的Init方法的声明如下:

  1. func (chanman *myChannelManager) Init(channelLen uint, reset bool) bool {
  2. if channelLen == 0 {
  3. panic(errors.New("The channel length is invalid!"))
  4. }
  5. chanman.channelLen = channelLen
  6. chanman.reqCh = make(chan base.Request, channelLen)
  7. chanman.respCh = make(chan base.Response, channelLen)
  8. chanman.itemCh = make(chan base.Item, channelLen)
  9. chanman.errorCh = make(chan error, channelLen)
  10. chanman.status = CHANNEL_MANAGER_STATUS_INITIALIZED
  11. return true
  12. }

在该方法中,我们进行了非常严格的参数检查。一旦参数channelLen的值不符合要求,就会引发一个运行时恐慌。对于一个公开的初始化方法来说,这样做并不过分。把隐患扼杀在摇篮之中总要比困难地排查隐晦的错误好得多。

方法Init需要做的另一件事是避免意外的重复初始化。这项工作同样重要。因为由于使用方的误操作而使通道管理器被意外地重新初始化是一件很糟糕的事情。为此,我们需要再向Init方法中添加两行代码:

  1. func (chanman *myChannelManager) Init(channelLen uint, reset bool) bool {
  2. if channelLen == 0 {
  3. panic(errors.New("The channel length is invalid!"))
  4. }
  5. if chanman.status == CHANNEL_MANAGER_STATUS_INITIALIZED && !reset {
  6. return false
  7. }
  8. // 省略若干条语句
  9. return true
  10. }

如果通道管理器已处于已初始化状态,那么我们就应该去检查参数reset的值。如果该值为false,那么就说明Init方法的调用方并不想对通道管理器进行重新初始化。这时我们就应该忽略后续的所有操作并直接返回false

在一般的情况下,这样的逻辑可以有效地避免意外重新初始化的情况发生。但是别忘了,通道管理器是一个公用的工具。它的所有方法都可能会被并发地调用。虽然通道本身是并发安全的,但是与它们绑定在一起的变量却不是。我们需要采用一些手段保证针对通道管理器中的那些通道类型的字段的读写操作的并发安全。显然,sync包中的读写锁能够满足这样的需求。

我们需要在myChannelManager类型的基本结构加入一个sync.RWMutex类型的字段:

  1. rwmutex sync.RWMutex // 读写锁。

通道管理器的Init方法会对内部进行初始化,其中就包括了对那4个通道类型的字段赋值。因此,我们需要对刚刚编写的myChannelManager类型的Init方法加以改造。同样是添加两行代码:

  1. func (chanman *myChannelManager) Init(channelLen uint, reset bool) bool {
  2. if channelLen == 0 {
  3. panic(errors.New("The channel length is invalid!"))
  4. }
  5. chanman.rwmutex.Lock()
  6. defer chanman.rwmutex.Unlock()
  7. if chanman.status == CHANNEL_MANAGER_STATUS_INITIALIZED && !reset {
  8. return false
  9. }
  10. // 省略若干条语句
  11. return true
  12. }

注意,在我们调用rwmutexLock方法之后,要记得立即编写一条defer语句,并包含针对该读写锁的Unlock方法的调用表达式。这样才能够保证当前函数被退出执行时,该读写锁总会被解锁,因而消除死锁的隐患。

经过了3个版本,我们的Init方法已经基本完善。不过,为了让该方法的接收者类型成为ChannelManager接口类型的实现类型,我们还需要为它继续编写几个方法。

3. 关闭

具体来讲,通道管理器的Close方法需要做3件事:检查状态、关闭所有通道和设置状态。当然,这一切依然要在读写锁的保护下进行。

只有在通道管理器正处于已初始化状态的时候,关闭通道才有意义。所以,一旦通道管理器未处于此状态就应该直接返回false。还记得吗?Close方法的唯一结果是bool类型的。关于关闭内部通道的方法,我们就不多说了。这需要用到内建函数close。最后,我们需要把常量CHANNEL_MANAGER_STATUS_CLOSED的值赋给当前的myChannelManager类型值的status字段。还记得吗?CHANNEL_MANAGER_STATUS_CLOSED常量是我们编写ChannelManager接口类型的时候声明的。它代表了通道管理器的已关闭状态。综上所述,我们应该这样编写myChannelManager类型的指针方法Close

  1. func (chanman *myChannelManager) Close() bool {
  2. chanman.rwmutex.Lock()
  3. defer chanman.rwmutex.Unlock()
  4. if chanman.status != CHANNEL_MANAGER_STATUS_INITIALIZED {
  5. return false
  6. }
  7. close(chanman.reqCh)
  8. close(chanman.respCh)
  9. close(chanman.itemCh)
  10. close(chanman.errorCh)
  11. chanman.status = CHANNEL_MANAGER_STATUS_CLOSED
  12. return true
  13. }

4. 获取通道

我们先前为通道的获取而设计的方法有4个:ReqChanRespChanItemChanErrorChan。这4个方法的实现应该是非常相似的。我们在这里以ReqChan方法为例。ReqChan方法的实现中必定包含了对当前myChannelManager类型值的reqCh字段的读操作。因此我们在这里依然会用到读写锁rwmutex

ReqChan这样的通道获取方法中,我们需要重点考虑的应该是在当前通道管理器处于不同状态的时候的处理方式。在获取通道的时候,通道管理器的状态理应是已初始化的。当发现不被期望的状态时,我们应该给予通道获取者必要的提示,以帮助它们作判断和采取下一步行动。不过,这部分职责并不应该是通道获取方法应该有的。因此,我们应该专门编写一个方法来做这件事。根据刚刚的描述,一个名为checkStatus的方法诞生了:

  1. // 检查状态。在获取通道的时候,通道管理器应处于已初始化状态。
  2. // 如果通道管理器未处于已初始化状态,那么本方法将会返回一个非nil的错误值。
  3. func (chanman *myChannelManager) checkStatus() error {
  4. if chanman.status == CHANNEL_MANAGER_STATUS_INITIALIZED {
  5. return nil
  6. }
  7. statusName, ok := statusNameMap[chanman.status]
  8. if !ok {
  9. statusName = fmt.Sprintf("%d", chanman.status)
  10. }
  11. errMsg :=
  12. fmt.Sprintf("The undesirable status of channel manager: %s!\n",
  13. statusName)
  14. return errors.New(errMsg)
  15. }

细心的读者可能会发现,我们在checkStatus方法中用到了标识符statusNameMap。从语法上看,它代表的应该是一个字典类型的值。确实,我们在goc2p项目的代码包webcrawler/middleware中有这样一个声明:

  1. // 表示状态代码与状态名称之间的映射关系的字典。
  2. var statusNameMap = map[ChannelManagerStatus]string{
  3. CHANNEL_MANAGER_STATUS_UNINITIALIZED: "uninitialized",
  4. CHANNEL_MANAGER_STATUS_INITIALIZED: "initialized",
  5. CHANNEL_MANAGER_STATUS_CLOSED: "closed",
  6. }

声明这个字典的主要目的是在需要时以更加易读的方式反映出通道管理器的状态。文字(单词、短语等)要比单纯的数字更容易让人理解。

有了checkStatus方法,ReqChan方法的职责就足够单一和清晰了,并且可以很容易地编写出来:

  1. func (chanman *myChannelManager) ReqChan() (chan base.Request, error) {
  2. chanman.rwmutex.RLock()
  3. defer chanman.rwmutex.RUnlock()
  4. if err := chanman.checkStatus(); err != nil {
  5. return nil, err
  6. }
  7. return chanman.reqCh, nil
  8. }

再次强调,在获取通道的过程中,施加rwmutex的读锁是非常有必要的。这可以保证通道管理器中的那些通道类型的字段的状态一致性。

ReqChan方法为参照,读者可以自己实现myChannelManager类型的指针方法RespChanItemChanErrorChan。这非常容易。

5. 获取摘要信息

通道管理器的Summary方法应该返回反映其内部状态的摘要信息。这个摘要信息应该足以让我们了解到通道管理器的所有的运行时细节。

为了实现这样一个方法,我们首先要编写出一个摘要信息的模板。其内容应该就像一道填空题那样。这道题的题干应该留出一些空当以等待填入通道管理器的状态以及其中各类通道实例的实时长度和固定容量。我们把包含上述内容的文字以fmt代码包中的函数能够识别的格式书写出来并赋给一个全局变量:

  1. var chanmanSummaryTemplate = "status: %s, " +
  2. "requestChannel: %d/%d, " +
  3. "responseChannel: %d/%d, " +
  4. "itemChannel: %d/%d, " +
  5. "errorChannel: %d/%d"

在确定了摘要信息的模板之后,我们在*myChannelManager类型的Summary方法中对它进行填空即可:

  1. func (chanman *myChannelManager) Summary() string {
  2. summary := fmt.Sprintf(chanmanSummaryTemplate,
  3. statusNameMap[chanman.status],
  4. len(chanman.reqCh), cap(chanman.reqCh),
  5. len(chanman.respCh), cap(chanman.respCh),
  6. len(chanman.itemCh), cap(chanman.itemCh),
  7. len(chanman.errorCh), cap(chanman.errorCh))
  8. return summary
  9. }

其中,我们通过使用字典statusNameMap让摘要信息的可读性更好了。

6. 其他方法

至此,我们离让*myChannelManager类型成为接口类型的实现的目标只有一步之遥。我们还需要为myChannelManager类型添加两际个指针方法,即ChannelLen方法和Status方法。相比于前面讲述的方法,这两个方法的实现就太简单了。它们只需分别返回myChannelManager类型中的相应字段的值即可。虽然如此简单,但如果没有了它们,NewChannelManager函数照样通不过编译。

到这里,我们就完成了对通道管理器的一个实现的编写。虽然它所管理的各类通道自身具有并发安全性,但是我们仍然使用了读写锁。很显然,这个读写锁的作用目标是通道管理器本身。它保证了通道管理器的内部状态的一致性。当然,这也有它的字段status的功劳。

通道管理器管理着各类通道。这使得调度器可以轻易地在各个处理模块之间搬移数据。不过,这些容量固定的通道有时候也会带来麻烦。这需要我们在调度器的实现中加入一些特殊的处理。这在后面会加以论述。总之,这种将通道集中管理的方式有助于各组件代码的简洁、职责的清晰,同时还使各个通道可以被热替换。

9.5.2 实体池

我们为实体池声明的接口类型Pool中包含了4个方法。其中,方法TakeReturn的功能分别是“取出”实体和“归还”实体。而方法TotalUsed则是被用于向外部提供实体池的真实状态。

我们在前面已经实现过Goroutine票池。它对实体池的实现有很高的参考价值。事实上,我们把前者的“票”改为“实体”即可。下面是具体的实现方法。

首先,我们要确定实体池的基本结构。与Goroutine票池相同,我们使用一个通道作为池中实体的容器。而与之不同的是通道的元素类型。我们在讲实体池接口的时候已经对此有所说明。因此,代表实体容器的字段应该是这样的:

  1. container chan Entity // 实体容器。

我们已经知道,Entity是一个接口类型。同时它也是实体池中各实体的静态类型。但是,我们只知道实体的静态类型是不够的。我们还需要明确它们的动态类型(也即实际类型)。这样才能够去检查将要入池的实体的类型是否符合要求。

由于Go语言不支持自定义泛型,所以我们不能很自然地约束和检查实体的类型。幸好标准库代码包reflect中的API可以帮助我们满足需求。我们在前面已经接触过reflect包。其中的Type类型可以用来表示某个类型。所以,我们在实体池的基本结构中加入一个这样的字段以表示实体的实际类型:

  1. etype reflect.Type // 池中实体的类型。

我们在本书中声明的大多数类型都有着自己的初始化方式。实体池中的各个实体的实际类型可以是任意的,只要它实现了Entity接口。实体池很难也不应该去关注所有这些类型的每一种初始化方式,即使这些方式都非常简易。这就需要我们引导实体池的使用者自己向实体池提供池中实体的初始化方法。这样,实体池在初始化自身的过程中就可以使用此方法创建和初始化所需的实体值了。

由于Go语言视函数为一等类型,所以我们可以直接把一个函数类型作为实体池的字段的类型,如下:

  1. genEntity func() Entity // 池中实体的生成函数。

字段genEntity的值将会代表池中实体的初始化方法。它完全由实体池的使用者提供。我们会在后面看到使用它的方式和时机。

实体池的第四个字段是total。这个uint32类型的字段的值会忠实地反映出使用者对实体池容量的期望。这一期望应该总是能够被实现。

综上所述,我们可以编写出实体池的实现类型的基本结构:

  1. // 实体池的实现类型。
  2. type myPool struct {
  3. total uint32 // 池的总容量。
  4. etype reflect.Type // 池中实体的类型。
  5. genEntity func() Entity // 池中实体的生成函数。
  6. container chan Entity // 实体容器。
  7. }

实体池也需要自己的初始化方式。为此,我们编写了NewPool函数。其声明如下:

  1. // 创建实体池。
  2. func NewPool(
  3. total uint32,
  4. entityType reflect.Type,
  5. genEntity func() Entity) (Pool, error) {
  6. // 省略若干条语句
  7. }

可见,myPool类型中的绝大部分字段的值都由这个函数的参数给定。唯一例外的是container字段。对于外部给定的值,我们应该实行严格校验、及时报错的策略。具体如下。

  • 参数total的类型使得它的值不可能是负整数。但是,0也是不符合要求的。为此,我们需要这样来检查它:
  1. if total == 0 {
  2. errMsg :=
  3. fmt.Sprintf("The pool can not be initialized! (total=%d)\n", total)
  4. return nil, errors.New(errMsg)
  5. }

当发现参数total的值为0时,我们及时中止了对实体池的初始化,并直接生成和返回相应的错误值。

  • 我们并不需要急于检查参数entityTypegenEntity的值。因为我们紧接着会在对实体池的container字段的初始化的过程中用到它们。这同样会保证我们刚刚所说的检查策略的实施。

下面我们就来说说怎样初始化container字段。在通常情况下,我们应该在初始化一个池的时候就把它填满。尤其是在实体的创建成本很低或需要提前准备以尽量减少后续操作的响应时间的情况下。

既然是这样,我们就应该在创建和初始化一个chan Entity类型值之后立刻填满它:

  1. size := int(total)
  2. container := make(chan Entity, size)
  3. for i := 0; i < size; i++ {
  4. newEntity := genEntity()
  5. if entityType != reflect.TypeOf(newEntity) {
  6. errMsg :=
  7. fmt.Sprintf("The type of result of function genEntity() is NOT %s!\n",
  8. entityType)
  9. return nil, errors.New(errMsg)
  10. }
  11. container <- newEntity
  12. }

在上述代码中,我们使用genEntity参数的值生成实体值,然后把它们陆续发送给container。不过,在进行发送操作之前,我们还需要检查由genEntity生成的实体值的实际类型是否符合要求。这里所说的要求也是由使用者给定的。它由entityType参数的值代表。一旦发现新实体值的实际类型与entityType参数的值所代表的类型不一致,我们就会立即忽略掉后续的初始化工作并直接生成和返回相应的错误值给NewPool函数的调用方。

如果执行了这些代码之后仍未出错,那么我们就可以认定调用方给出的这些参数值都是合法的,并可以放心大胆地使用了。在这之后,我们创建一个*myPool类型值,然后将它作为NewPool函数的第一个结果值:

  1. pool := &myPool{
  2. total: total,
  3. etype: entityType,
  4. genEntity: genEntity,
  5. container: container,
  6. }
  7. return pool, nil

读者把上面这3段代码按照出现顺序拼接起来,就可以得出NewPool函数的函数体了。注意,我们最后返回的第一个结果值是*myPool类型的。读者应该知道这意味着什么。

在明确了myPool类型的基本结构及其初始化方式之后,我们就可以来编写接口类型Pool中声明的那4个方法了。注意,在myPool类型中,这4个方法都应该是指针方法,而不应该是值方法。

首先,Take方法的实现是相当简单的。我们只需要从container字段代表的通道处接收一个元素值(即实体值)并返回给方法的调用方即可。不过,Take方法的结果声明有两个。第二个结果是error类型的。既然存在第二个结果,那么我们就要充分利用。我们可以把接收表达式的结果赋给两个变量。如果第二个结果的值为false,那么就说明container通道已经被关闭。这时,Take方法就应该直接返回表示此问题的错误值。虽然这种情况几乎不可能发生,但是我们仍然有必要对它进行检测和处理。Take方法的完整声明如下:

  1. func (pool *myPool) Take() (Entity, error) {
  2. entity, ok := <-pool.container
  3. if !ok {
  4. return nil, errors.New("The inner container is invalid!")
  5. }
  6. return entity, nil
  7. }

相比之下,Return方法的实现要复杂一些。这是因为,该方法是要将方法调用者传入的Entity类型值发送给container通道。在进行发送操作之前,Return方法必须对该值进行严格的检查。该检查至少分为两步。第一步就是非nil检查。一个有效的实体值必定不为nil。否则,Return方法不予以处理,然后生成并返回一个表示此问题的错误值。如果此步通过,那么就需要检查该值的实际类型是否与实体池的etype字段代表的类型一致。这非常重要。因为Return方法的调用方可能会有意或无意地将一个类型不符的值传递进来。在这种情况下,Return方法应该不予以处理并及时报错。这一步骤保证了实体池中的实体的类型一致性,同时也确保了它的Take方法的调用方能够得到预期类型的值。这两步检查的代码如下:

  1. if entity == nil {
  2. return errors.New("The returning entity is invalid!")
  3. }
  4. if pool.etype != reflect.TypeOf(entity) {
  5. errMsg := fmt.Sprintf("The type of returning entity is NOT %s!\n", pool.etype)
  6. return errors.New(errMsg)
  7. }

如果调用方传入的Entity类型值通过了上述两个步骤的检查,那么就说明该值是符合要求的。但是,我们的检查却并未结束。要知道,我们无法限制外部对实体池的Return方法的调用次数。并且,即使一个Entity类型值并不是实体池最初在内部生成的,它也仍然可以被传递给Return方法并通过前面的检查。所以,我们还需要考虑一种情况:如果被传给Return方法的值不是最初在NewPool函数中生成的,那么我们应该怎样处理它。如果要阻止这样的值被发送到container通道,那么我们又该怎样去鉴别它们呢?

还记得吗?Entity接口类型中那个唯一的方法声明是:

  1. Id() uint32

该方法会返回一个能够唯一标识池中实体值的整数。我们可以记录下池中实体值的Id方法的结果值,并利用该值来鉴别某个Entity类型值是否是池中原有的实体值。为了记录下这些ID,我们需要再在myPool类型的基本结构中加入一个字段。该字段的声明如下:

  1. idContainer map[uint32]bool // 实体ID的容器。

字典idContainer被用来存储池中实体的ID。我们可以通过这样一个字典快捷地辨别一个Entity类型值的有效性(即是否是池中的实体)。

若要让它可用,我们需要在NewPool函数中对它进行初始化。这里所说的初始化并不仅仅指使用make函数创建并初始化一个字典值并将其赋给idContainer字段,还意味着要保持它与container通道的同步。为此,我们要对该函数中的这段代码:

  1. size := int(total)
  2. container := make(chan Entity, size)
  3. for i := 0; i < size; i++ {
  4. newEntity := genEntity()
  5. // 省略若干条语句
  6. container <- newEntity
  7. }

进行修改。这包括创建和初始化一个map[uint32]bool类型的字典值,以及在把新实体newEntity发送给container通道之后立即将该实体的ID作为键添加到之前创建的字典值中。因此,前面那段代码就被改为:

  1. size := int(total)
  2. container := make(chan Entity, size)
  3. idContainer := make(map[uint32]bool)
  4. for i := 0; i < size; i++ {
  5. newEntity := genEntity()
  6. // 省略若干条语句
  7. container <- newEntity
  8. idContainer[newEntity.Id()] = true
  9. }

此外,我们还需要在最后初始化myPool类型值的时候,把这里的idContainer变量赋给该值的idContainer字段:

  1. pool := &myPool{
  2. total: total,
  3. etype: entityType,
  4. genEntity: genEntity,
  5. container: container,
  6. idContainer: idContainer,
  7. }

在做好上述准备之后,可以编写*myPoolReturn方法中的第三步检查了。其代码如下:

  1. if _, ok := pool.idContainer[entity.Id()]; !ok {
  2. errMsg := fmt.Sprintf("The entity (id=%d) is illegal!\n", entity.Id())
  3. return errors.New(errMsg)
  4. }

即使Return方法接受的参数值不为nil、类型符合要求并且也是由NewPool函数中的代码创建并发送给当前实体池的container通道的,这个值也不一定就应该被归还。我们应该考虑到同一个Entity类型值可能会被归还多次的情况。一个Entity类型值仅仅在已被取出并且还未被归还的情况下才应该作为Return方法的参数值。反过来讲,只有这时,Return方法才应该认可并处理它。

为了做到这一点,我们需要更加充分地利用idContainer字段。我们已经知道,该字段代表的字典的元素类型是bool类型的。利用这些字典元素的值,我们就可以表示与之对应的ID所属的实体的状态。true代表未被取出或已被归还,false代表已被取出且未被归还。根据这一定义,我们对Return方法的参数entity的第四步检查的代码如下:

  1. if v := pool.idContainer[entity.Id()]; v {
  2. errMsg := fmt.Sprintf("The entity (id=%d) is already in the pool!\n", entity.Id())
  3. return errors.New(errMsg)
  4. }

可以看到,我们在这里依然使用的是针对idContainer字典的索引表达式。因此,我们可以把第三步和第四步检查合并一下:

  1. entityId := entity.Id()
  2. v, ok := pool.idContainer[entityId]
  3. if !ok {
  4. errMsg := fmt.Sprintf("The entity (id=%d) is illegal!\n", entityId)
  5. return errors.New(errMsg)
  6. }
  7. if v {
  8. errMsg := fmt.Sprintf("The entity (id=%d) is already in the pool!\n", entityId)
  9. return errors.New(errMsg)
  10. }

在完成了这4步检查之后,我们就可以放心地把entity发送给container通道了。不过别忘了,为了让最后一步的检查正确无误,我们同时还要修改idContainer字典中对应的键值对。相应的代码如下:

  1. pool.idContainer[entityId] = true
  2. pool.container <- entity
  3. return nil

最后那条return nil语句表示相应实体已被归还给实体池。

对于Take方法,我们也需要做一些改动,为的是正确表示池中实体的已被取出且还未被归还的状态。改造后的Take方法如下:

  1. func (pool *myPool) Take() (Entity, error) {
  2. entity, ok := <-pool.container
  3. // 省略若干条语句
  4. pool.idContainer[entity.Id()] = false
  5. return entity, nil
  6. }

至此,通过myPool类型的idContainer字段,我们可以判断出一个Entity类型值是否是池中之物,以及它是否已被取出且未被归还。这使得实体池中的每个实体都有了自己的状态,也使得Return方法中对参数值的检查更加严谨了。不过,这又引入了一个新的问题。myPool类型的container字段是通道类型的。它本身就是并发安全的,所以无论我们怎样操作它都不用顾及同步的问题。而myPool类型的idContainer字段是字典类型的。Go语言的字典类型自身并未对并发安全性做出任何保证。那么,我们需要使用额外的同步方法来保证这一点吗?显然,答案是肯定的。

我们使用互斥锁来解决这个问题。这个互斥锁需要保证两处代码的串行执行。它们分别在Take方法和Return方法中。为了实施这项改造,我们需要先为myPool类型添加如下字段:

  1. mutex sync.Mutex // 针对实体ID容器操作的互斥锁。

对于Take方法,由于需要被保护的语句有一条,且它紧挨在最后的return语句之前,所以我们直接在需要被保护的语句的前面添加相应的互斥锁操作即可。改造后的Take方法如下:

  1. func (pool *myPool) Take() (Entity, error) {
  2. entity, ok := <-pool.container
  3. if !ok {
  4. return nil, errors.New("The inner container is invalid!")
  5. }
  6. pool.mutex.Lock()
  7. defer pool.mutex.Unlock()
  8. pool.idContainer[entity.Id()] = false
  9. return entity, nil
  10. }

方法Return中的情况要相对复杂一些。因为其中包括了对idContainer字典的读操作和写操作,并且写操作是否需要被执行,取决于读操作的结果值。具体代码如下:

  1. v, ok := pool.idContainer[entityId]
  2. if !ok {
  3. errMsg := fmt.Sprintf("The entity (id=%d) is illegal!\n", entityId)
  4. return errors.New(errMsg)
  5. }
  6. if v {
  7. errMsg := fmt.Sprintf("The entity (id=%d) is already in the pool!\n", entityId)
  8. return errors.New(errMsg)
  9. }
  10. pool.idContainer[entityId] = true

注意,这段代码中的一些语句是不需要被保护的,比如errMsg变量的声明语句。

我们直接在这段代码的前面添加相应的互斥锁操作是不妥当的。因为在它之后且在最后那条return语句之前还有针对通道container的发送操作。这样做非常容易锁死相关的Goroutine,并造成爬取流程的停滞不前。

正确的做法是:把需要保护的语句抽离出来并放到一个独立的方法中,以便于我们尽量缩小临界区的大小,以及方便使用defer语句来保证互斥锁的释放。具体步骤是,先在独立的方法中进行针对idContainer字典的读操作,然后根据读操作的结果来决定是否执行写操作。并且,创建出可以表示这一个读写过程的结果的值,并返回给该独立方法的调用方(也就是Return方法)。在Return方法中,我们根据该独立方法的结果值作出选择,是生成并返回相应的错误值还是把给定的实体发送给container通道。

依照这个思路,我们可以编写出如下方法:

  1. // 比较并设置实体ID容器中与给定实体ID对应的键值对的元素值。
  2. // 结果值:
  3. // -1:表示键值对不存在。
  4. // 0:表示操作失败。
  5. // 1:表示操作成功。
  6. func (pool *myPool) compareAndSetForIdContainer(
  7. entityId uint32, oldValue bool, newValue bool) int8 {
  8. pool.mutex.Lock()
  9. defer pool.mutex.Unlock()
  10. v, ok := pool.idContainer[entityId]
  11. if !ok {
  12. return -1
  13. }
  14. if v != oldValue {
  15. return 0
  16. }
  17. pool.idContainer[entityId] = newValue
  18. return 1
  19. }

对于compareAndSetForIdContainer方法,无需再做解释。该方法的注释已经说得很清楚了。

接下来,我们需要对Return方法进行相应的改造。在其中的从entityId := entity.Id()语句后面到该方法体结束的那段代码应变为:

  1. casResult := pool.compareAndSetForIdContainer(entityId, false, true)
  2. if casResult == 1 {
  3. pool.container <- entity
  4. return nil
  5. } else if casResult == 0 {
  6. errMsg := fmt.Sprintf("The entity (id=%d) is already in the pool!\n", entityId)
  7. return errors.New(errMsg)
  8. } else {
  9. errMsg := fmt.Sprintf("The entity (id=%d) is illegal!\n", entityId)
  10. return errors.New(errMsg)
  11. }

其中,把针对container通道的发送操作提到靠前的位置,是为了能够在条件满足的情况下尽快地归还实体。

现在,实体池类型myPool的“取出”和“归还”功能已经基本完善了。不过,我们还需要让*myPool类型成为Pool接口的实现类型。它还缺少两个方法,即TotalUsed

显然,有了total字段和len函数,实现这两个方法会非常容易。所以我们在这里就不再赘述了。虽然简单,但是它们的作用还是重要的,尤其对于网络爬虫框架的监控机制来说。有了它们,我们就可以实时地了解到实体池的使用情况了。

在实现了myPool类型的指针方法TotalUsed之后,我们可以使用go build命令编译webcrawler/middleware代码包。如果没有出现任何编译错误,那么就说明实体池接口及其实现在代码层面上已经完成并正确了(别忘了,与通道管理器相关的代码也在该代码包中,所以也要保证它们正确无误才能通过编译)。不过,它们是否可以按照我们的意愿运行起来,还要看对应的功能测试的结果。对于这些功能测试,我们在这里忽略不讲,读者可以试着去实现它。

9.5.3 停止信号

停止信号的接口类型StopSign中包含的方法声明不少。不过,代表核心操作的方法只有3个,即SignSignedDeal。它们分别代表了发出信号、判断信号和处理信号的功能。它们体现了停止信号的本意。

我们把停止信号的实现类型的命名为myStopSign。构思它的基本结构首先要考虑的就是被用来承载停止信号的字段。我们在讲述其接口类型的时候说过,它不适合用通道类型来代表。因为这会给停止信号的发出方和处理方都带来很大的约束。实际上,一个停止信号只可能有两个状态:未发出和已发出。所以,我们使用一个bool类型的字段就完全可以表示信号本身和停止信号类型的实例的状态了:

  1. signed bool // 表示信号是否已发出的标志位。

字段signed的初始值即是其类型的零值。当停止信号的Sign被调用时,我们就把true赋予该字段。而当Reset被调用时,我们就把该字段的值重新赋为false

接口类型StopSign的声明表明,停止信号需要为每一个处理方的处理次数进行计数。每一个处理方都会有一个唯一的标识。在调用停止信号的Deal方法的时候,它们应该将相应的标识作为参数值传入。由于这个标识是唯一的,所以我们可以使用一个字典来满足计数的需求。因此,我们为myStopSign类型加入这样一个字段:

  1. dealCountMap map[string]uint32 // 处理计数的字典。

与实体池的情况类似,我们应该会在一些方法中同时操作多个字段,并且想把处在同一个方法中的多个操作合为一个原子操作。为此,我们应该使用互斥锁来保证相关操作的并发安全性和原子性。我们对于上面这两个字段都有读操作和写操作的需求,比如,停止信号状态的变更和读取,以及处理计数的累加和读取。因此,我们应该使用读写锁来保护这些操作。这样,就有了myStopSign类型的第三个字段:

  1. rwmutex sync.RWMutex // 读写锁。

有了基本结构,我们就可以开始编写被用来初始化停止信号的NewStopSign函数了。该函数的声明如下:

  1. // 创建停止信号。
  2. func NewStopSign() StopSign {
  3. ss := &myStopSign{
  4. dealCountMap: make(map[string]uint32),
  5. }
  6. return ss
  7. }

注意,我们在初始化myStopSign类型值得时候,只为其中的一个字段赋了值。这是因为其他两个字段的零值就已经是可用的了。

我们接下来编写停止信号最核心的3个方法。这看起来并不难。Sign方法需要做的最重要的事就是把true赋给signed字段。而Signed方法的职责就是把signed字段的值真实的返回调用方。至于Deal方法,则是要根据参数code的值在dealCountMap字典中添加或修改相应的键值对。注意,对于Sign方法和Deal方法来说,在某种情况下是需要忽略实际的操作的。

先来看Sign方法。我们知道,网络爬虫框架中的各个组件在接收到停止信号之后,应该立即做好当前任务的善后处理并取消后续的所有任务。它们对停止信号的响应都是一次性的,同时也都是不可逆的。我们可以通过停止并再次启动调度器来重新创建和初始化网络爬虫框架内的所有组件。但是,这些组件都会是全新的实例,而那些在停止调度器之前还有效的那些组件实例都会被丢弃。正因为如此,我们没有必要发出停止信号多次。当然,如果停止信号已被重置,那就是另外一回事了。这一特性在Sign方法上的反映就是:当发现signed字段的值已经为true的时候,就应该忽略后续的对signed的赋值操作。其实,要不是为了准确地设定Sign方法的结果值以表示当次调用的成功与否,我们不检查signed字段的值而直接为它赋值也是完全没有问题的。综上所述,Sign方法的声明如下:

  1. func (ss *myStopSign) Sign() bool {
  2. ss.rwmutex.Lock()
  3. defer ss.rwmutex.Unlock()
  4. if ss.signed {
  5. return false
  6. }
  7. ss.signed = true
  8. return true
  9. }

之所以在实现如此简单的方法中加入对读写锁的操作,是因为我们不希望在同时发生多次调用的时候出现多于一个的结果值为true的情况。换句话说,无论在什么情况下,我们都希望只有一个发出停止信号的调用被真正处理。因此,对Sign方法中的代码的执行应该是互斥的。

与之形成鲜明对比的是Signed方法。在后者中,我们只需读取signed字段的值并将其作为方法的结果值,而无需添加任何前置和后置的检查和操作。因此,其中的代码也就没有互斥执行的需求。Signed方法的方法体只包含了一行代码,如下:

  1. func (ss *myStopSign) Signed() bool {
  2. return ss.signed
  3. }

再来说Deal方法。它的主要操作对象是dealCountMap字段。这势必会用到读写锁。另外,若发现停止信号还未被发出,则后续操作理应被忽略,否则就会导致处理计数的不准确。该方法的声明如下:

  1. func (ss *myStopSign) Deal(code string) {
  2. ss.rwmutex.Lock()
  3. defer ss.rwmutex.Unlock()
  4. if !ss.signed {
  5. return
  6. }
  7. if _, ok := ss.dealCountMap[code]; !ok {
  8. ss.dealCountMap[code] = 1
  9. } else {
  10. ss.dealCountMap[code] += 1
  11. }
  12. }

我们要说的停止信号的第4个方法就是Reset方法。这个方法很有用。其重要性仅次于前述的3个方法。我们可以通过调用它来重置停止信号。它是让停止信号从已发出状态转换为未发出状态的唯一方法。

方法Reset的实现也很简单。我们只需要把signed赋值为false,并清零所有的处理计数。也正因为包含了这两项操作,所以我们需要使用读写锁将它们保护起来。其声明如下:

  1. func (ss *myStopSign) Reset() {
  2. ss.rwmutex.Lock()
  3. defer ss.rwmutex.Unlock()
  4. ss.signed = false
  5. ss.dealCountMap = make(map[string]uint32)
  6. }

可以看到,我们在这里直接丢弃了dealCountMap字段的原有值,并为它绑定了一个新的值。这也是清零所有处理计数的最便捷的方式。

要想让*myStopSign类型成为StopSign接口类型的实现类型,我们还需要为其编写3个方法,即DealCountDealTotalSummary。这3个方法的实现都相当简单,所以我们只在这里提示一下要点。

我们应该使用rwmutex的读锁来保护DealCount方法和DealTotal方法中的代码,因为它们都需要从dealCountMap字典中获取相应的键值对。

至于Summary方法,我们就不用多说了。它的结果值应该忠实地反映出停止信号内部的即时状态,比如表示信号是否已发出的标志位和处理计数的集合。

好了,我们对停止信号的实现的介绍就暂时告一段落。读者在后面会看到,调度器会使用停止信号来实现网络爬虫框架的停止功能。

9.5.4 ID生成器

在本节的最后,我们再来说说网络爬虫框架的中间件工具集中方法最少的那个工具——ID生成器。

ID生成器的接口类型IdGenerator中只包含了一个方法声明:

  1. GetUint32() uint32 // 获得一个uint32类型的ID。

不过,为了实现这个接口,我们需要添加多个字段来支持。首先,需要明确的是,ID是非负整数且是递增的。也就是说,我们第一次调用一个ID生成器的GetUint32方法时会得到uint32类型的值0,而第二次调用则应该得到1,以此类推。因此,我们应该用一个字段来表示当前的ID的值,像这样:

  1. sn uint32 // 当前的ID。

其次,虽说uint32类型可以被用来表示近43亿个非负整数,但是万一我们对ID生成器的GetUint32方法的调用次数超过了这个数量那又该怎么办呢?这里的解决方案很简单——从头开始。也就是说,当发现前一个ID已是uint32类型所能表示的最大值的时候,我们就把当前的ID重新赋值为0。为了能够识别这种情况,我们还需这样一个字段:

  1. ended bool // 前一个ID是否已经为其类型所能表示的最大值。

最后,与大多数中间件工具一样,我们需要使用同步方法来保证其并发安全性。由此,我们需要声明的第3个字段如下:

  1. mutex sync.Mutex // 互斥锁。

这3个字段有一个共同的特点:它们的类型的零值即是我们需要分别赋给它们的值。所以,被用于创建并初始化ID生成器的函数NewIdGenerator的函数体中只需包含一条语句:

  1. return &cyclicIdGenerator{}

由于该函数的唯一结果的类型是IdGenerator,所以我们要保证cyclicIdGenerator的指针类型即为接口类型IdGenerator的实现类型。这就意味着,我们需要实现的GetUint32方法应为cyclicIdGenerator类型的指针方法,而不是值方法。

我们需要在cyclicIdGenerator类型的指针方法GetUint32中做的最重要的一件事,就是我们刚才说的检查前一个ID是否已为最大值并做出相应处理。这包括两个步骤。其中的一个步骤就是在得到sn字段的值之后,立即判断它是否是uint32类型所能表示的最大值。如果答案是肯定,那么我们就把true赋给ended字段。另一个步骤是与前者呼应的。它会在获取到sn字段的值之前先检查ended字段的值。如果该值为true,那么它就会直接将sn字段的值重置为0,并将其返回给方法的调用方。这样一来,我们就可以持续不断地从ID生成器中取出可用的ID了。读者不用担心ID可能会重复的问题,因为我们之前编写的实体池所容许的最大容量也不会超过uint32类型的取值范围。下面是GetUint32方法的完整声明:

  1. func (gen *cyclicIdGenerator) GetUint32() uint32 {
  2. gen.mutex.Lock()
  3. defer gen.mutex.Unlock()
  4. if gen.ended {
  5. defer func() { gen.ended = false }()
  6. gen.sn = 0
  7. return gen.sn
  8. }
  9. id := gen.sn
  10. if id < math.MaxUint32 {
  11. gen.sn++
  12. } else {
  13. gen.ended = true
  14. }
  15. return id
  16. }

若读者把这里的ID生成器用在了别处并且确实会调用ID获取方法超过43亿次,那么建议对这里的ID生成器进行扩展。这应该很容易。我在这里提供一种扩展的思路。

我们把cyclicIdGenerator类型的扩展类型命名为cyclicIdGenerator2。既然后者是前者的扩展,那么后者的基本结构中就应该包含一个类型为前者的字段:

  1. // ID生成器的实现类型2。
  2. type cyclicIdGenerator2 struct {
  3. base cyclicIdGenerator // 基本的ID生成器。
  4. cycleCount uint64 // 基于uint32类型的取值范围的周期计数。
  5. }

我们可以看到,cyclicIdGenerator2的基本结构中还存在了第二个字段cycleCountcycleCount字段的值代表了一个周期计数。这个周期计数是对于基本的ID生成器而言的。换句话说,该计数值表示了基本ID生成器base所生成的ID已经第几次触碰到了uint32类型所能表示的最大值。我们利用这个计数值可以将从base处获取到的uint32类型的ID扩展为uint64类型的ID。请看下面的方法声明:

  1. func (gen *cyclicIdGenerator2) GetUint64() uint64 {
  2. var id64 uint64
  3. if gen.cycleCount%2 == 1 {
  4. id64 += math.MaxUint32
  5. }
  6. id32 := gen.base.GetUint32()
  7. if id32 == math.MaxUint32 {
  8. gen.cycleCount++
  9. }
  10. id64 += uint64(id32)
  11. return id64
  12. }

可以看到,GetUint64方法每次都会检查baseGetUint32方法返回的uint32类型的ID是否是uint32类型值的上限。如果答案是肯定的,那么就让cycleCount字段的值自增一次。

另外,之所以要在开始处把cycleCount字段的值以2取模,是因为uint64类型允许的最大值是uint32类型允许的最大值的两倍。每当cycleCount字段的值为2的倍数的时候,就说明GetUint64方法上一次返回的结果值已是uint64类型允许的最大值。不过,我们要关注的是已经超出uint32类型的取值范围但却仍在uint64类型的取值范围内的情况。这也是我们总是判断该取模操作的结果值是否等于1的原因。在此种情况下,把baseGetUint32方法返回的ID值与uint32类型允许的最大值相加,就可以得到一个64位的ID值了。并且,如此得到的64位的ID值依然是连续的。这时,uint32类型允许的最大值就相当于一个基数。

另一方面,在把cycleCount字段的值以2取模的结果值为0的情况下,我们并没有设置基数(相当于把基数也设置为0)。由于这时用uint64类型表示的ID值与用uint32类型表示的ID值是相同的,所以我们不用进行任何附加的操作。这也保证了cyclicIdGenerator2类型的指针方法GetUint64返回的ID值总会基于uint64类型的取值范围而循环。

如此,我们编写扩展的ID生成器cyclicIdGenerator2才得以正确地生成64位的ID值,并且其生成规则与原始的ID生成器的生成规则完全一致。

有了ID生成器,网页下载器和分析器都可以非常方便地为它们的实例生成全局唯一的ID。ID生成器自身是并发安全的。这也使得我们在使用它的时候不用有所顾虑。(请注意,扩展的ID生成器cyclicIdGenerator2并不是并发安全的,读者可以为它添加这一特性吗?)