9.6 处理模块的实现

有了前面设计并实现的那些中间件工具的支持,我们再来实现网络爬虫框架中的各个处理模块就会方便很多。在本节,我会带领读者逐一地实现这些处理模块。在这个过程中,读者也会体会出那些中间件工具带来的益处。所谓的“磨刀不误砍柴工”即是如此。

由于我们在对网络爬虫框架进行总体设计的时候,严格遵循了“单一职责”的原则,所以,这些处理模块在功能上都并不复杂。不过,在实现它们的时候仍然存在一些需要特别处理的地方。下面我们就从相对简单的网页下载器开始讲起。

9.6.1 网页下载器

在本小节,我们将会详细讲解网页下载器以及容纳其集合的池的实现方法。在这个过程中,读者也可以看到一些中间件工具的实际应用。

1. 网页下载器的实现

网页下载器的功能是向目标服务器发送给定的请求以获得相应的网页内容。我们在对网络爬虫框架进行详细设计的时候,提到过代表HTTP客户端的类型http.Client。它可以发送HTTP请求并接受HTTP响应。我们的网页下载器恰恰需要使用这样的工具来实现功能。因此,在我们的网页下载器的实现类型中应该包含这样一个字段:

  1. httpClient http.Client // HTTP客户端。

此外,每个网页下载器的实例都应该拥有一个仅属于自己的ID。所以下面的字段是必须的:

  1. id uint32 // ID。

在确定了基本结构之后,我们把网页下载器的实现类型命名为myPageDownloader

按照惯例,我们需要编写一个被用来获取新的网页下载器实例的NewPageDownloader函数。这个函数仅仅接受一个*http.Client类型的参数client。之所以把这个参数的类型设定为http.Client类型的指针类型,是因为我们允许该函数的调用方不指定它。也就是说,该参数接受nil的赋值。在这种情况下,函数内部会以默认方式创建一个HTTP客户端实例,并把它赋给那个新的myPageDownloader类型值的httpClient字段。

至于为新的网页下载器实例分配ID的工作,我们就不用劳烦NewPageDownloader函数了。因为我们已经编写好了ID生成器。为了持有一个网页下载器专用的ID生成器,我们需要在当前包(也就是goc2p项目的webcrawler/downloader代码包)的源码文件中声明如下的包级私有的全局变量:

  1. // ID生成器。
  2. var downloaderIdGenerator mdw.IdGenerator = mdw.NewIdGenerator()

其中,限定标识符mdw.NewIdGenerator的前缀mdw代表了代码包webcrawler/middleware。这样的表示可以成立,是由于我们在导入这个代码包的时候为该代码包起了别名mdw

在声明了变量downloaderIdGenerator之后,我们还需要声明这样一个函数:

  1. // 生成并返回ID。
  2. func genDownloaderId() uint32 {
  3. return downloaderIdGenerator.GetUint32()
  4. }

添加genDownloaderId的函数可以将网页下载器与ID生成器完全解耦。虽然mdw.IdGenerator已经代表一个接口了,但是一旦有了上述函数,就可以对网页下载器彻底隐藏ID的具体生成方式了。并且,这种把对外部API的调用代码集中于一处的方法,也会提高之后代码维护工作的效率。

好了,在进行了一番准备之后,我们就可以很方便地编写出NewPageDownloader函数的函数体了。该函数的完整声明如下:

  1. // 创建网页下载器。
  2. func NewPageDownloader(client *http.Client) PageDownloader {
  3. id := genDownloaderId()
  4. if client == nil {
  5. client = &http.Client{}
  6. }
  7. return &myPageDownloader{
  8. id: id,
  9. httpClient: *client,
  10. }
  11. }

关于变量idclient的赋值方式,我们都已说明。注意,在使用复合字面量创建并初始化http.Client类型值的时候,我们可以不为它的任何字段赋值。如此即是以默认的方式创建一个HTTP客户端实例的方式。

此外,读者应该能够通过其中的return语句看出,我们是想让*myPageDownloader类型成为PageDownloader接口的实现类型。若要满足这个需求,我们就需要为myPageDownloader类型编写两个指针方法——IDDownload。其中,ID方法相当简单。它只需原样返回该类型的id字段的值即可:

  1. func (dl *myPageDownloader) Id() uint32 {
  2. return dl.id
  3. }

方法Download会接受一个base.Request类型(即webcrawler/base代码包中的结构体类型Request)的参数,并返回一个base.Response的结果值和一个error类型的结果值。这个方法本身要做的工作并不复杂。我们把与目标服务器交换数据的工作都交给了httpClient字段代表的HTTP客户端。该HTTP客户端有一个名为Do的方法。该方法即可实现我们刚刚所说的数据交换工作。我们调用该方法并传入一个http.Request类型的值,待相关操作完成之后,就可以得到一个*http.Response类型的值和一个error类型的值了。前者代表了目标服务器针对当前请求的响应,而后者则代表了可能发生的错误。若后者不为nil,则说明方法Do在被执行的过程中出现了错误。这时,Download方法应该立即将该错误值反馈给它的调用方。请看下面的代码:

  1. httpResp, err := dl.httpClient.Do(httpReq)
  2. if err != nil {
  3. return nil, err
  4. }

其中,变量httpReqhttpResp即分别代表了请求和响应。我们在此前并没有声明过httpReq。不过我们马上就会说明。

我们已经知道,base.Request类型和base.Response类型分别相当于对http.Request类型和http.Response类型的一层薄薄的封装。HTTP客户端的Do方法所接受和返回的第一个值的类型分别为后两者,而网页下载器的Download方法所接受和返回的第一个值的类型分别为前两者。这就意味着,这个Download方法还要有一个简单但却重要的职责:完成相关的封装和拆封。我们在编写网络爬虫框架的基本数据类型base.Requestbase.Response时已经想到了这一点。因此,履行此职责只需编写相应的调用语句和复合字面量即可。

对于Download方法的参数req的值,调用其HttpReq方法即可获取到被包含在内的HTTP请求。变量httpReq就是这样被声明和赋值的:

  1. httpReq := req.HttpReq()

再来说对于Download方法的第一个结果值的生成。如果HTTP客户端的Do方法的第二个结果值(那个error类型值)等于nil,那么就说明它与目标服务器的交互操作已经成功地完成了。这时,我们就需要这样来创建并初始化一个base.Response类型值并把它返回给Download方法的调用方:

  1. return base.NewResponse(httpResp, req.Depth()), nil

我们现在来总结一下。只要我们有了一个HTTP客户端,就可以用它来创建和初始化网页下载器。这个工作由webcrawler/downloader包中的NewPageDownloader函数来完成。NewPageDownloader函数会使用同包中的genDownloaderId函数为其生成的网页下载器创建一个ID,并且还会在我们传入的HTTP客户端不可用时,自己生成一个以确保该网页下载器的可用性。网页下载器的Download方法为实现相应功能会进行3个步骤的操作。首先,它会获取到base.Request类型的参数值中的HTTP请求。其次,它会利用HTTP客户端与目标服务器交互,并得到一个HTTP响应和一个可能不为nil的错误值。最后,Download方法会根据这两个交互结果来生成它自己的结果值。

网页下载器的功能非常简单,但其角色却非常关键。因为网络爬虫要分析的所有内容都会从网页下载器那里获得。在大多数情况下,我们是不会满足于只有一个网页下载器在工作的。这是由于与其他环节相比,网络通讯所需的时间会多很多。它们的耗时往往不在一个数量级上。因此,我们就需要更多的网页下载器并发地与目标服务器交换数据。这就需要以某种方式来创建和管理一个网页下载器的集合。

2. 网页下载器池的实现

显然,我们会使用网页下载器池来做这件事。在9.4节中,我们已经给出了网页下载器池的接口类型。另外,我们之前还实现了一个实体池。它可以作为网页下载器池的内部实现的一部分。实际上,我们再对实体池稍加封装就可以得到一个网页下载器池的实现了。

首先请看网页下载器池的实现类型myDownloaderPool的基本结构:

  1. // 网页下载器池的实现类型。
  2. type myDownloaderPool struct {
  3. pool mdw.Pool // 实体池。
  4. etype reflect.Type // 池内实体的类型。
  5. }

我们可以看到,mdw.Pool类型(即webcrawler/middleware代码包中的Pool类型)的字段pool已位列其中。除此之外,字段etype也是很有用的。它被用来表示实体池内的实体的实际类型。我们在实现myDownloaderPool类型的Take方法的时候会用到它。

按照惯例,函数NewPageDownloaderPool的用途是创建和初始化网页下载器池。下面是它的声明:

  1. // 创建网页下载器池。
  2. func NewPageDownloaderPool(
  3. total uint32,
  4. gen GenPageDownloader) (PageDownloaderPool, error)

该函数接受两个参数。第一个参数即代表了池的总容量,而第二个参数则为网页下载器的生成函数。其中的GenPageDownloader类型的声明如下:

  1. // 生成网页下载器的函数类型。
  2. type GenPageDownloader func() PageDownloader

有了这两个参数的值,我们就可以创建并初始化一个实体池的实例了。这需要调用mdw.NewPool并传入必要的参数值。

函数mdw.NewPool接受3个参数。首先是代表其总容量的uint32类型值,其次是代表其中实体的类型的reflect.Type类型值,最后是代表了实体生成函数的func() Entity类型值。对于第一个参数的值,我们可以直接使用NewPageDownloaderPool函数接受的第一个参数值来赋予。而对于后两个参数的值,我们就需要依据NewPageDownloaderPool函数接受的第二个参数值转换得出了。

更确切地说,我们会使用实验法得出mdw.NewPool函数需要的第二个参数值。请看下面的代码:

  1. etype := reflect.TypeOf(gen())

我们先调用了一次网页下载器生成函数gen,并使用reflect.TypeOf函数来获取前者返回的结果值。这里存在一条约定俗成的规则:无论gen函数被调用多少次,它返回的结果值的实际类型都应该是一致的。因此,我们通过这样的方式就可以确定传给mdw.NewPool函数的第二个参数值了。

函数NewPageDownloaderPool的第二个参数是GenPageDownloader类型的,而mdw.NewPool函数的第三个参数的类型字面量却是func() Entity。显然,它们是不同的。我们不能直接使用前者的值为后者赋值。不过,我们却可以通过对前者的简单封装而得到后者:

  1. genEntity := func() mdw.Entity {
  2. return gen()
  3. }

这完全得益于Go语言视函数类型为一等类型的特性。

注意,由于PageDownloader接口类型包含了mdw.Entity接口类型中声明的所有方法,所以类型为前者的值也可以被视为后者的值。这符合Go语言的编译规则。

经过上述准备之后,我们就可以调用mdw.NewPool函数了:

  1. pool, err := mdw.NewPool(total, etype, genEntity)

如果这里的变量err的值不为nil,那么就说明mdw.NewPool无法根据给定的参数值初始化一个实体池。我们应该及时把它传递给该方法的调用方:

  1. if err != nil {
  2. return nil, err
  3. }

如果变量err的值的为nil,那么就意味着初始化myDownloaderPool类型值的充分条件已经具备。这时,我们仅使用之前得到的那些值就可以轻松地完成这项任务:

  1. dlpool := &myDownloaderPool{pool: pool, etype: etype}

由于我们希望把*myDownloaderPool类型作为PageDownloaderPool接口类型的实现类型,所以这里应该直接把dlpool变量的值作为NewPageDownloaderPool函数的第一个结果值返回。为了让这条返回语句能够顺利地通过编译,我们就需要为myDownloaderPool类型编写PageDownloaderPool接口类型中声明的所有方法。并且,这些方法都应该是指针方法。

首先,我们来编写Take方法。该方法的声明如下:

  1. func (dlpool *myDownloaderPool) Take() (PageDownloader, error)

在它的方法体中,我们首先需要调用dlpool的字段poolTake方法。这是返回一个网页下载器的第一步。相关代码如下:

  1. entity, err := dlpool.pool.Take()

在为entityerr这两个变量赋值之后,我们会进行两项检查。第一项检查就是判断err变量的值是否为nil。与NewPageDownloaderPool函数中的做法相同,若此值不为nil就要及时上报。我们在讲解实体池的实现类型的时候,说明过在怎样的情况下它的Take方法的第二个结果值会是非nil的。虽然就目前对它的使用方式来看,这种情况是不可能出现的,但是我们在这里的检查仍然是有必要的。这会为今后的代码变更添加一层保障。

如果实体池poolTake方法没有返回非nil的错误值,那么我们就要紧接着进行第二项检查。与上一项检查相同,如果实体池工作正常,那么该项检查必定会通过。否则,我们就必须忽略余下的所有操作并返回错误值。第二项检查的代码如下:

  1. dl, ok := entity.(PageDownloader)
  2. if !ok {
  3. errMsg := fmt.Sprintf("The type of entity is NOT %s!\n", dlpool.etype)
  4. panic(errors.New(errMsg))
  5. }

可以看到,此项检查针对的是从实体池中获取的实体的类型。在初始化实体池的时候,我们给予它了一个通过实验得出的代表了实体类型的值,并且还把该值赋给了网页下载器池的etype字段。实体池会保证它的Take方法总是按照我们的意愿返回类型与之相应的值。而在作为又一层封装的网页下载器池中,我们只需检查它的Take方法返回的结果值的类型否是与接口类型PageDownloader相匹配。显然,这一点已由NewPageDownloaderPool函数对其参数的类型约束保证了。也正因为如此,一旦发现类型不匹配的情况,就会立即引发一个运行时恐慌。

最后,若没有发生任何异常情况,我们就可以把最终得到的那个PageDownloader类型值(由变量dl代表)返回给方法的调用方了。别忘了还要把nil作为该方法的第二个结果值。

Take方法相比,myDownloaderPool类型的其他方法实现起来就极其简单了。我们仅仅调用其内部的实体池的相应方法并返回其返回的结果即可。这些方法的实现如下:

  1. func (dlpool *myDownloaderPool) Return(dl PageDownloader) error {
  2. return dlpool.pool.Return(dl)
  3. }
  4. func (dlpool *myDownloaderPool) Total() uint32 {
  5. return dlpool.pool.Total()
  6. }
  7. func (dlpool *myDownloaderPool) Used() uint32 {
  8. return dlpool.pool.Used()
  9. }

到这里,网页下载器池的实现已经编写完毕了。我们使用go build命令编译代码包webcrawler/downloader以确保上述代码在语法上正确无误。

网页下载器为网络爬虫框架提供了可以从几乎任何公开的、支持HTTP协议的目标服务器上下载资源的能力。并且,由于有了网页下载器池的支持,我们可以在资源消耗可控的前提下并发地进行上述操作。请注意,这里使用了“几乎”这个词。这是有原因的。有些服务器只为通过其认证或授权的客户端提供服务。这可能需要终端用户提供用户名和密码,或者需要客户端在HTTP请求的头部中加入一些特殊的信息,再或者需要我们使用另外的一套协议与之交互。这些先决条件有的可以通过对网络爬虫框架的定制来满足,而有的则需要专门的流程设计和实现。由于本书主题和篇幅的原因,我们暂时不考虑后者而只关注前者。在后面的讲解中,读者会了解到采用怎样的定制方式才能够使用网络爬虫框架下载到需要认证或授权的数据。比如,下一小节将要讲到的分析器就为此给使用方预留了足够的定制空间。

9.6.2 分析器

本小节会讲解分析器和分析器池。分析器为我们分析响应提供流程上的支持,而分析器池则为这一流程的并行化提供了支持。它们使用到的辅助工具与网页下载器和网页下载器池所用的并没有差别。不过,与后者不同的是,它们会为使用者提供非常大的定制空间。

接口类型AnalyzerAnalyze方法接受两个参数。一个代表了作为分析目标的响应,而另一个则代表了分析方法的列表。也就是说,分析需要的所有东西都会在Analyze方法被调用的时候传递给它。因此,我们可以让分析器的基本结构非常简单。请看下面的代码:

  1. // 分析器的实现类型。
  2. type myAnalyzer struct {
  3. id uint32 // ID。
  4. }

可以看到,分析器的实现类型myAnalyzer的声明中只包含了一个字段id。它被用来唯一地标识分析器。这与网页下载器中的id字段的用途是相同的。另外,为这个id赋值的时候同样会用到ID生成器,也同样会新建一个函数将当前实现与ID生成器隔离开,这个函数被命名为genAnalyzerId

分析器实现类型的基本结构很简单,因此用来创建它的NewAnalyzer函数的实现非常简洁。它的函数体只需包含一个复合字面量和一条return语句。如下:

  1. // 创建分析器。
  2. func NewAnalyzer() Analyzer {
  3. return &myAnalyzer{id: genAnalyzerId()}
  4. }

显然,若要使*myAnalyzer类型成为Analyzer接口类型的实现类型,就必须为它实现指针方法IdAnalyze。关于Id方法的实现,我们自不必多说。而Analyze方法的实现就要复杂得多了。

首先,该方法作为myAnalyzer类型的指针方法的声明是这样的:

  1. func (analyzer *myAnalyzer) Analyze(
  2. respParsers []ParseResponse,
  3. resp base.Response) (dataList []base.Data, errorList []error)

我们已经知道,对响应resp的分析以及数据列表和错误列表的产出的工作完全是由参数respParsers中的若干个响应解析函数来完成的。因此,Analyze的主要任务就是把响应依次传递给这些响应解析函数并获取结果,然后再把这些结果汇总并返回给调用它的一方。下面我们就依据这个思路来编写Analyze函数的实现。

作为一个函数,首先应该检查每个参数值的有效性。对于参数respParsers来说,我们直接判断它是否为nil就可以:

  1. if respParsers == nil {
  2. err := errors.New("The response parser list is invalid!")
  3. return nil, []error{err}
  4. }

而参数resp是结构体类型的,因此即使是其零值也不会为nil。我们应该把检查的重点放在它的httpResp字段上。还记得吗?我们可以通过调用它的HttpResp方法来获取该字段的值。具体如下:

  1. httpResp := resp.HttpResp()
  2. if httpResp == nil {
  3. err := errors.New("The http response is invalid!")
  4. return nil, []error{err}
  5. }

在这之后,我们可以记录一些日志以表示将要处理一个有效的HTTP响应:

  1. var reqUrl *url.URL = httpResp.Request.URL
  2. logger.Infof("Parse the response (reqUrl=%s)... \n", reqUrl)

其中的标识符logger代表了一个logging.Logger类型的值。我们在之前多次提到过logging代码包以及其中的Logger。后者是一个定义了日志记录器行为的接口类型。它们同样存在于goc2p项目之中。我们在后面还会多次提到它们。值得一提的是,为了能够对在网络爬虫框架中使用的日志记录器进行统一配置,我们在webcrawler/base代码包中专门编写了一个被用来获取日志记录器的函数:

  1. // 创建日志记录器。
  2. func NewLogger() logging.Logger {
  3. return logging.NewSimpleLogger()
  4. }

在网络爬虫框架的其他代码包中,如果要声明一个代表了日志记录器的变量并为它赋值,那么它们就可以这样编写:

  1. // 日志记录器。
  2. var logger logging.Logger = base.NewLogger()

当然,前提是当前的源码文件已经导入了webcrawler/base代码包。

这样做有一个明显的好处,那就是:当我们要改变日志记录器的创建和初始化方式的时候,仅需修改base.NewLogger函数的实现即可。这有效地避免了散弹式的修改。请想象一下,如果我们在每个需要用到日志记录器的地方都这样为logger变量赋值:

  1. var logger logging.Logger = logging.NewSimpleLogger()

那么情况将会怎样?

言归正传。在myAnalyzer类型的指针方法Analyze中,我们调用了loggerInfof方法记录了一条普通的信息。这条信息包含了这段代码将要执行的操作以及与该HTTP响应对应的URL(URL可以让我们很方便地区分响应)。这很重要。因为这样我们就可以从日志中看到哪些时候分析器处理了哪些HTTP响应了。

接下来,我们应该取出参数resp的另外一个字段的值以备用:

  1. respDepth := resp.Depth()

变量respDepth的值即指该响应包含的网页深度。从该网页的内容中分析出的网络地址及其可能对应的网页的深度应该与前者有递增关系。

现在开始准备通过参数respParsers的值分析HTTP响应。先要建立两个分别可以收集数据(请求或条目)和错误的容器:

  1. // 解析HTTP响应。
  2. dataList = make([]base.Data, 0)
  3. errorList = make([]error, 0)

然后,使用respParsers参数值中的响应解析函数对httpResp变量的值进行分析。大致的代码如下:

  1. for i, respParser := range respParsers {
  2. // 省略若干条语句
  3. pDataList, pErrorList := respParser(httpResp, respDepth)
  4. // 省略若干条语句
  5. }

可以看到,我们在for语句依次拿到respParsers中的每个响应解析函数,然后分别调用它们并获取结果。不过,在这之前,我们还有一些工作要做。

还记得吗?Go语言的函数类型的值可以为nil。为了保证调用成功,我们应该首先判断当前的响应解析函数是否为nil。如果答案是肯定的,那么我们就应该立即生成一个可以说明此问题的错误值,然后忽略余下的操作并去获取下一个响应解析函数。相关代码如下:

  1. if respParser == nil {
  2. err := errors.New(fmt.Sprintf("The document parser [%d] is invalid!", i))
  3. errorList = append(errorList, err)
  4. continue
  5. }

我们在错误的描述中加入了这个为nil的响应解析函数在respParsers中的索引。希望这样能够帮助Analyze方法的调用方快速定位问题所在。之后,我们把该错误值追加到先前声明的切片值(由errorList变量代表)中,以使它可以被传递到方法之外。最后,我们使用continue语句让包含它的那条for直接进行下一个迭代。

一旦当前的响应解析函数通过检查,我们就可以放心地把HTTP响应httpResp交给它处理了。待其分析完成之后,我们就会得到两个切片值。它们分别由变量pDataListpErrorList代表,并分别表示了从该HTTP响应中分析出的数据的列表以及在这个过程中发生的错误的列表。

理所应当,我们应该把每个响应解析函数返回的这样的两个列表都分别追加到dataListerrorList中去。注意,对于这两个追加操作,我们有着不同的要求。

我们把执行追加操作的代码封装在一个名为appendDataList函数中,并在针对pDataList变量的for语句中调用它:

  1. if pDataList != nil {
  2. for _, pData := range pDataList {
  3. dataList = appendDataList(dataList, pData, respDepth)
  4. }
  5. }

函数appendDataList的声明如下:

  1. // 添加请求值或条目值到列表。
  2. func appendDataList(dataList []base.Data, data base.Data, respDepth uint32)
  3. []base.Data

它接受3个参数。第一个参数的值必须是会被Analyze方法返回的数据容器,第二个参数的值应该是pDataList中的某一个数据项,而第三个参数的值则应当是当前被分析的那个响应的深度值。此外,该函数还会返回一个[]base.Data类型的结果。该结果的值即是被追加数据之后的数据容器。

函数appendDataList使用前两个参数值的方式非常明显。不过在这之中,我们还要进行一些特殊的处理。这涉及新的请求的深度。

按照惯例,第一步操作是检查参数data的值的有效性。因为该值是从响应解析函数中返回的。它来自网络爬虫框架之外。我们总是应该先检查这样的值的有效性。如果该值为nil,我们就应该忽略追加操作,并直接把原来的数据容器dataList返回给appendDataList函数的调用方。代码如下:

  1. if data == nil {
  2. return dataList
  3. }

还记得吗?起到类似作用的语句有个学名,叫卫述语句。

如果data的值不为nil,我们就继续进行后面的操作。我们检查data代表的新请求中的深度值。不过,前提是data代表的是请求而不是条目。这就需要我们先进行判断。这会用到类型断言表达式:

  1. req, ok := data.(*base.Request)

我们试图用类型断言表达式把data的值转换为base.Request类型的值。由于base.Request类型代表了请求,并且是base.Data接口类型的实现类型,所以当该值为一个请求的时候,这一类型转换就会成功。成功与否会由ok变量的值来体现。如果ok的值为false,那么就说明类型转换不成功,该数据并不是一个请求。既然该数据不是一个请求,那一定就是一个条目。若为这种情况,我们无需进行任何特殊处理,而直接把该数据追加到数据容器中并返回追加操作的结果即可:

  1. if !ok {
  2. return append(dataList, data)
  3. }

如果oktrue,那么变量req就会是一个有效的*base.Request类型值。此时,我们就要通过调用它的Depth方法来检查它的深度值是否是正确的。如果不正确,我们就应该重新创建并初始化一个请求来替代它。请看下面的代码:

  1. newDepth := respDepth + 1
  2. if req.Depth() != newDepth {
  3. req = base.NewRequest(req.HttpReq(), newDepth)
  4. }

其中,变量newDepth的值代表了新请求应该具有的深度值。

我们之所以替换这个请求而不是直接改变它,是因为*base.Request类型没有给我们提供这样的方法。并且,我们不应该为此而放宽这种限制。实际上,我们对这些网络爬虫框架的基本数据类型都采用了这样的设计。这样可以让它们的值具有不可变性。如此一来,当我们把一个这样的值传递给一个未处于webcrawler/base代码包中的函数或方法的时候,就不用担心该值会被改变了。这样既可以消除很多方面的顾虑,又可以简化相关代码。

不过,这样也带来一个问题,就是当我们想修改这样的值的时候不得不先对它进行完整的复制。就像我们在前面示例中做的那样。如果存在非常多的类似需求,那么我们就应该仔细考虑是否依然保留这种不可变性。因为如此复制一般会消耗大量的内存,并且给运行时系统的垃圾回收器造成很大的压力。

考虑到这里只是在发现响应解析函数返回的请求的深度不正确的时候才会复制它,我们无需改变现有的设计。

经过这样的检查和必要的修正之后,我们就可以把新请求追加到数据容器并返回它了:

  1. return append(dataList, req)

另一方面,我们对于每个响应解析函数返回的代表若干错误的切片值pErrorList也需要做类型的处理。区别是,我们不需要对其中的错误值做任何检查。相似地,我们把追加的操作也封装在了一个函数中:

  1. // 添加错误值到列表。
  2. func appendErrorList(errorList []error, err error) []error {
  3. if err == nil {
  4. return errorList
  5. }
  6. return append(errorList, err)
  7. }

而在Analyze函数中的那条for代码块的最后,我们这样来调用这个函数:

  1. if pErrorList != nil {
  2. for _, pError := range pErrorList {
  3. errorList = appendErrorList(errorList, pError)
  4. }
  5. }

以上就是被用来真正处理一个响应的for代码块中的全部代码。一旦这条for语句被执行完成,就意味着对当前的响应的分析处理已经结束。这时,我们就可以将数据容器和错误容器返回给Analyze方法的调用方了:

  1. return dataList, errorList

至此,我们已经编写完成了Analyze方法和myAnalyzer类型。编译一下,以保证*myAnalyzer类型真正实现了Analyzer接口类型(如果NewAnalyzer函数通过了编译就可以证明)以及不存在其他语法错误。

我们通过以响应解析函数(由ParseResponse类型的值代表)作为参数的方式来接纳使用方对分析器的定制。同时,经过思考的接口设计也为这些定制的行为和网络爬虫框架之间的融合提供了很好的支持。实际上,对响应进行分析的工作是非常复杂的。正因为如此,网络爬虫框架不应该介入到这些复杂的逻辑当中,而应该专心为它们提供良好的执行环境。使用方提供的若干个响应解析函数会被网络爬虫框架中的所有分析器使用。与网页下载器类似,这些分析器会由一个分析器池容纳和管理。

有了编写网页下载器池的经验,我们再来实现分析器池会非常简单。请读者回顾9.2节中的与池的接口设计相关的内容。可以看到,这两个池的接口类型声明极其相似。这也是我们在前面编写实体池并以此来为这两者的实现提供底层支持的原因。

现在是练手的好机会。请读者模仿网页下载器池的实现方式自己编写一个名为myAnalyzerPool的类型,并使该类型成为AnalyzerPool接口类型的实现类型。请记住,虽然myAnalyzerPool类型实现起来会与myDownloaderPool类型非常相近,但是我们应该把它们独立开来,包括像ID获取函数这类的辅助代码。这样,我们今后就可以很方便地单独改变和演进它们了。

网页下载器和分析器会更多地使用不同类型的系统资源。前者主要利用网络I/O完成功能,而后者专注于内部计算。这种需要上的不同会使得它们慢慢向着不同的方向进化。因此,从一开始就在抽出并共享池的本质行为(指的是实体池)的前提下分离这两个应用层面的池的实现是很有好处的。

请读者真正地去实现一个分析器池,不要偷懒。这并不困难。请在写完那些代码并通过编译之后再回到这里继续往下读。

好了,在实现了网页下载器和分析器以及容纳它们的池以后,我们编写网络爬虫框架的处理模块的任务已经完成了大半。最后这个模块与前两模块有很多不同之处。它不需要用池来管理,并且只需一个实例就足够提供必要的功能。它的实例是几乎不可被改变的。另一方面,它提供定制的方式与分析器有一些类似。

9.6.3 条目处理管道

条目处理管道会以流的方式来处理我们发送给它的每一个条目。每一个处理步骤都由一个条目处理器来代表。每个条目处理器都会接受一个条目并返回一个已经经过处理的条目和一个错误值。这些条目处理器的行为是由webcrawler/itempipeline代码包中的ProcessItem函数类型来定义。

根据ProcessItem函数类型以及同一个代码包中的接口类型ItemPipeline对条目处理管道的行为的定义,我们可以得出以下声明:

  1. // 条目处理管道的实现类型。
  2. type myItemPipeline struct {
  3. itemProcessors []ProcessItem // 条目处理器的列表。
  4. failFast bool // 表示处理是否需要快速失败的标志位。
  5. sent uint64 // 已被发送的条目的数量。
  6. accepted uint64 // 已被接受的条目的数量。
  7. processed uint64 // 已被处理的条目的数量。
  8. processingNumber uint64 // 正在被处理的条目的数量。
  9. }

很明显,myItemPipeline类型中的字段itemProcessors代表了条目处理管道将会持有的若干个条目处理器,字段failFast的值会体现出条目处理管道的“快速失败”特性,而后面的4个字段都是为了满足统计的需要而建立的。

从网络爬虫框架的角度看,条目处理管道只会接受输入,而不会产生任何输出。这是由于它接受的每一个条目都已经可以被看作是某一个请求的最终产出的一部分或全部了。所以,那些经过条目处理器处理的条目,对网络爬虫框架本身已经没有任何意义了。它们是否需要被以某种形式输出到别处,完全由使用方提供的那些条目处理器来决定。条目处理管道不关心也无需关心这些额外的输出情况。然而,它却应该时刻关注整个条目处理流程的运作情况,比如调度器向它发送了多少个条目?在这些被发送的条目中有多少条目是有效的?有多少条目经过了所有的处理?以及在某一个时刻有多少条目正在被处理?只要知道了这些数字,我们就可以比较完整地勾勒出条目处理管道的实时运行情况了。这也是myItemPipeline类型的基本结构中的最后那4个字段的存在意义。

这4个字段是无需被初始化的。因为它们的零值都是0。这正好符合我们的要求。failFast字段也是如此。原因是我们可以以后通过调用SetFailFast方法改变它的值。因此,唯一需要使用方初始化的就是条目处理器的列表了。在myItemPipeline类型中,它们由itemProcessors字段代表。

我们现在编写一个名为NewItemPipeline的函数,并让它接受一个[]ProcessItem类型的参数以及返回一个ItemPipeline类型的结果。它的声明如下:

  1. func NewItemPipeline(itemProcessors []ProcessItem) ItemPipeline

在这个函数的函数体中,我们首先要检查itemProcessors参数的值。由于我们之后不会再去关心条目处理的结果,所以这里对条目处理器列表的检查要更加严格一些。请看下面的语句:

  1. if itemProcessors == nil {
  2. panic(errors.New(fmt.Sprintln("Invalid item processor list!")))
  3. }
  4. innerItemProcessors := make([]ProcessItem, 0)
  5. for i, ip := range itemProcessors {
  6. if ip == nil {
  7. panic(errors.New(fmt.Sprintf("Invalid item processor[%d]!\n", i)))
  8. }
  9. innerItemProcessors = append(innerItemProcessors, ip)
  10. }

一旦发现itemProcessors字段的值为nil,就会立即引发一个运行时恐慌。不但如此,当发现列表中的某一个条目处理器为nil时,也会及时以引发运行时恐慌的方式上报。如果以上检查都通过了,我们就会得到一个与itemProcessors参数同值的变量innerItemProcessors。之所以要完全复制一份条目处理器列表,是因为我们不希望在用它来初始化条目处理管道之后使用者仍然有机会改变它。还记得吗?一个切片值持有的是一个值的引用(它会与一个底层数组相对应)。同时,它也是一类值的容器。所以,在把它作为参数值传递给一个函数或方法之后,我们对其中的某个或某些值的变更,一定也会影响到已被传入到函数或方法中的那个值。对于同样属于引用类型的字典值来说也是如此,而对于数组值来说却恰恰与之相悖。

综上所述,我们使用使用者提供的条目处理器列表的复制品来初始化条目处理管道是安全的。因为这样做使得真正起作用的那个值与外界隔离了。最后,初始化代码非常简单,只有一行:

  1. return &myItemPipeline{itemProcessors: innerItemProcessors}

以上说明的就是NewItemPipeline函数的实现代码的全部。显然,我们要把*myItemPipeline类型实现为ItemPipeline接口类型的实现类型。这需要我们依据这个接口类型的声明为myItemPipeline类型编写6个对应的指针方法。我们首先来看最为复杂的Send方法的实现。其声明如下:

  1. func (ip *myItemPipeline) Send(item base.Item) []error

myItemPipeline类型来说,这个指针方法Send应该具有下面4个功能。

  • 检查item的值的有效性。忽略对无效条目的处理。

  • 依次调用itemProcessors字段中的条目处理器对有效的条目进行处理,并依据failFast字段的值控制处理流程。

  • 收集处理过程中发生的错误,并将相应的值作为结果值返回。

  • 在整个处理的过程中,适时的对字段sentacceptedprocessedprocessingNumber的值进行设置,以满足记录和统计的需要。

其中,第一、二个功能点表示了Send方法需要执行的流程,而实现第三、四个功能点的代码则会夹杂在其中。

首先,我们来看第一个功能点。还记得吗?base.Item类型是一个字典类型的别名类型。因此,它的值可能为nil。加之我们对条目中应该包含哪些键值对并无要求,所以这里仅对item的值进行一项检查即可。显然,如果条目为nil,那么也就没必要对它进行处理了。相关代码如下:

  1. errs := make([]error, 0)
  2. if item == nil {
  3. errs = append(errs, errors.New("The item is invalid!"))
  4. return errs
  5. }

请注意,我们在一开始就要留心收集错误值。这就是声明errs变量的原因。实际上,在Send方法执行流程的过程中,只要发生错误,我们就会把对应的错误值追加到errs的值中。

接下来是第二个功能点。读者可能已经猜到,实现第二个功能点的代码的主体是一条for语句。它的迭代对象就是itemProcessors字段的值。我们需要利用这个for代码块完成对条目的流式处理。已知每个条目处理器都会接受一个条目,也都会返回一个已经经过处理的条目和一个错误值。由此,在发现某一个条目处理器返回了一个非nil的错误值的时候,我们应该根据failFast值来决定是否中断对该条目的处理。相关代码如下:

  1. var currentItem base.Item = item
  2. for _, itemProcessor := range ip.itemProcessors {
  3. processedItem, err := itemProcessor(currentItem)
  4. if err != nil {
  5. errs = append(errs, err)
  6. if ip.failFast {
  7. break
  8. }
  9. }
  10. if processedItem != nil {
  11. currentItem = processedItem
  12. }
  13. }

请注意,其中的变量currentItem起着很关键的作用。我们就是利用它让条目依次流经各个条目处理器的。此外,在这个过程中也存在着类似的收集错误值的代码。很明显,在该方法体的最后,我们是要将变量errs的值作为结果值返回的。

再来说第四个功能点。读者感觉我们应该在上述代码的哪些位置上插入对myItemPipeline类型的最后那4个字段的值的设置代码呢?最明显的当属设置sent字段的值的位置了。它应该出现在该方法体的最前面。不过,有一个设置会比它更靠前。这就是对processingNumber字段的值的设置。processingNumber字段的值代表了正在被条目处理管道处理的条目的数量。所以,Send方法一经调用就应该首先递增它的值。注意,由于Send方法一定会被并发地调用,所以我们必须要使用同步手段来保护这类操作。这其中还包含了对processingNumber字段的值的递减操作。由于该操作需要在Send方法即将被执行结束的时候进行,所以我们需要把该操作放在defer语句中。总之,我们应该在Send方法的方法体的开始处加入下面这几行代码:

  1. atomic.AddUint64(&ip.processingNumber, 1)
  2. defer atomic.AddUint64(&ip.processingNumber, ^uint64(0))
  3. atomic.AddUint64(&ip.sent, 1)

可以看到,我们通过调用sync/atomic包中的相应方法来保证递增或递减操作的并发安全。另外,请注意,我们递减uint64类型的字段processingNumber的值的方式。在本书中,这种特殊用法首次出现在8.3节。

现在来看设置accepted字段的值的时机。让Send方法接受并准备处理一个条目的前提是该条目是有效的。因此,这行代码:

  1. atomic.AddUint64(&ip.accepted, 1)

应该紧跟在对条目进行有效性检查的代码(那条if语句)之后。

最后是对processed字段的值的设置。已被处理的含义是,条目已被所有条目处理器处理过,不论在这个过程中是否有错误发生。注意,如果failFast字段的值为true,那么条目已被处理也意味着在处理过程中未发生任何错误。否则,这二者之间就不存在必然的联系。总之,我们应该把代表该递增操作的代码

  1. atomic.AddUint64(&ip.processed, 1)

置于前面那条for语句的后面。

好了,以上就是Send方法包含的所有代码。读者应该可以以正确的顺序把它们排列好。下面我们继续讲剩下的5个方法的实现方法。

针对于failFast字段的FailFast方法和SetFailFast方法的实现极其简单。我们在此略过。

方法Count需要返回3个计数值。就myItemPipeline类型而言,它们即是字段sentacceptedprocessed的值。这看起来也非常简单。不过有一点需要注意,就是我们一定要使用原子操作来读取它们的值:

  1. counts := make([]uint64, 3)
  2. counts[0] = atomic.LoadUint64(&ip.sent)
  3. counts[1] = atomic.LoadUint64(&ip.accepted)
  4. counts[2] = atomic.LoadUint64(&ip.processed)

这也同样适用于ProcessingNumber方法的实现。其中的读取processingNumber字段的值的方式应该是完全一致的。

最后是Summary方法。Summary方法返回的值应该反映当前的条目处理管道的一般状态和实时状态。这涉及了myItemPipeline类型中的所有字段。请看我们为此定义的摘要信息的模板:

  1. var summaryTemplate = "failFast: %v, processorNumber: %d," +
  2. " sent: %d, accepted: %d, processed: %d, processingNumber: %d"

下面我们来为它填空。需要准备的值共有6个。第一个值很容易得到,直接使用failFast字段的值即可。后面的“processorNumber”指的是条目处理器的数量。所以我们需要使用itemProcessors字段的长度值来填充第二个空当。至于后面的4个空当,我们同样可以直接用字段sentacceptedprocessedprocessingNumber的值来填充。不过,我们在这里同样要使用原子操作。鉴于我们已经实现了Count方法和ProcessingNumber方法,所以可以不必再去调用sync/atomic包中的函数。到这里,我们就可以编写出Summary方法的完整声明了:

  1. func (ip *myItemPipeline) Summary() string {
  2. counts := ip.Count()
  3. summary := fmt.Sprintf(summaryTemplate,
  4. ip.failFast, len(ip.itemProcessors),
  5. counts[0], counts[1], counts[2], ip.ProcessingNumber())
  6. return summary
  7. }

以上就是条目处理管道的全部实现代码。最后,我们需要使用go build命令对webcrawler/ itempipeline代码包进行编译,以保证代码的合法性并确保*myItemPipeline类型已经实现了接口类型ItemPipeline

到这里,我们已经讲述了网络爬虫框架中的全部处理模块的接口设计和实现代码。这些处理模块都做到了职责上的单一和清晰。这也是我们设计的方向。它们都专注于做好某一个方面的事情,而毫不关心爬取流程中的其他部分。我们依靠调度器把这些处理模块串联起来,并使整个流程能够被真正地运转起来。

我们在下一节就会专门讲解调度器的实现。读者会从中了解到,调度器不只是使用通道管理器在各个处理模块之间搬运数据那么简单。在它的实现中还包含了很多因地制宜的设计技巧。下面,就让我们进入到下一节。