9.4 详细设计

在本节中,我们会逐一讲解网络爬虫框架中的各个模块所涉及的数据结构和接口。在这个过程中,我们还会了解到控制和调度这些模块及其实例时可能需要用到的工具。

9.4.1 基本数据结构

为了承载和封装数据,我们需要先声明一些基本的数据结构。网络爬虫框架中的各个组件都会用到这些数据结构。所以,可以说它们是这一程序的基础。

我们在分析网络爬虫框架的需求的时候提到过这样几类数据:请求、响应、条目。下面我们就逐个讲解它们的声明和设计理念。

请求是被用来承载与某一个网络地址对应的HTTP请求的。它会由调度器或分析器生成并被传递给网页下载器。网页下载器会根据它从远程服务器下载相应的网页。因此,它应该有一个类型为net/http代码包中的Request类型的字段。不过,为了减少不必要的零值生成(http.Request是一个结构体类型,它的零值不是nil)和实例复制,我们应该把*http.Request作为该字段的类型。下面是base.Request类型的声明的第一个版本:

  1. // 请求。
  2. type Request struct {
  3. httpReq *http.Request // HTTP请求的指针值。
  4. }

我们把基本数据结构的声明都放在了goc2p项目下的webcrawler/base代码包中。因此,该类型的限定符是base.

从我们已经提到的与此相关的功能需求来看,这样的声明已经足够了。不过,我们也说过网络爬虫应该能够在爬取过程结束之后自动停止。那么网络爬虫在对一个网站上的网页爬取到什么程度才能够算作是爬取过程的结束呢?对网页爬取程度的一个比较常用的量化方法是计算每个被下载的网页的深度。网络爬虫应该可以根据最大深度的预设值忽略掉对“更深”的网页的下载操作。当所有在该最大深度范围内的网页都被下载完成的时候,就意味着爬取过程即将完成。待这些网页也都被分析和处理完成,就能够判定网络爬虫对爬取过程的执行的结束了。因此,为了记录网页的深度,我们还应该在base.Request类型的声明加入一个字段。综上所述,该类型声明的第二个版本如下:

  1. // 请求。
  2. type Request struct {
  3. httpReq *http.Request // HTTP请求的指针值。
  4. depth uint32 // 请求的深度。
  5. }
  6. // 创建新的请求。
  7. func NewRequest(httpReq *http.Request, depth uint32) *Request {
  8. return &Request{httpReq: httpReq, depth: depth}
  9. }
  10. // 获取HTTP请求。
  11. func (req *Request) HttpReq() *http.Request {
  12. return req.httpReq
  13. }
  14. // 获取深度值。
  15. func (req *Request) Depth() uint32 {
  16. return req.depth
  17. }

我们希望base.Request类型的值是不可变的。也就是说,在一个base.Request类型的值被创建和初始化之后,当前代码包之外的代码不能对它的任何字段的值进行更改。一般对于这样的需求,我们会通过以下3个步骤来实现它。

  1. 把该类型的所有字段的访问权限都设置为包级私有。也就是说,要保证这些字段的名称的首字母均为小写。

  2. 编写一个创建和初始化该类型的值的函数。由于该类型的所有字段均不能被当前代码包之外的代码直接访问,所以它们自然也就无法为这样的字段赋值。这样也是我们需要在该类型声明所属的代码包内编写这样一个函数的原因。这类函数的名称一般都以“New”为前缀。它们会接受一些参数值并以此为基础初始化一个目标类型的值,然后再把该值返回给函数的调用方。

  3. 编写必要的用来获取某个字段的值的方法。这一步骤并不是必须的。原因可能是有的类型想要隐藏其实现细节,也可能是它的字段的类型(比如切片类型、字典类型、指针类型和一些结构体类型)不允许它们这样做。

显然,我们就是依照上述这3个步骤来编写base.Request类型声明的第二个版本的。注意,NewRequest函数的结果的类型是base.Request,而不是base.Request。这样做的主要原因是,我们是使用base.Request类型作为相关方法的接收者的类型的。

在这里,我们再专门说明一下base.Request类型的depth字段。理论上,uint32类型已经可以使depth字段的值足够大了。由于深度值不可能是负数,所以我们也不需要为此而牺牲正整数的部分取值范围。我们传递给调度器的首次请求的深度应该是0。这也是首次请求的一个标识。那么,后续请求的深度应该怎样计算和传递呢?假设网页下载器发出了首次请求并成功接收到了响应,经过分析器的分析,我们在其中找到了若干个新的网络地址并生成了新的请求,那么这两个新请求的深度就为1。比如,这里有两个新的请求:请求A和请求B。如果我们在接收并分析了请求A的响应之后得到了请求C,那么请求C的深度就是2。以此类推。我们可以把首次请求看作是请求A和请求B的父请求。反过来讲,可以把请求A和请求B视作首次请求的子请求。因此,就有了这样一条规则:一个请求的深度等于对它的父请求的深度递增一次后的结果。

在理解了我们刚刚对请求深度计算方法的描述之后,读者可能会发现:只有对某个请求的响应内容进行分析之后,才可能会需要生成新的请求。并且,调度器并不会直接把请求作为参数传递给分析器。这样不符合我们先前对数据流转方式的设计,同时也会使这两个处理模块之间的交互变得混乱。显然,响应也应该携带深度值。一方面,这可以算作是标示响应的深度的一种方式。另一方面,也是更重要的一方面,它可以作为新请求的深度值的计算依据。因此,base.Response类型的声明如下:

  1. // 响应。
  2. type Response struct {
  3. httpResp *http.Response
  4. depth uint32
  5. }
  6. // 创建新的响应。
  7. func NewResponse(httpResp *http.Response, depth uint32) *Response {
  8. return &Response{httpResp: httpResp, depth: depth}
  9. }
  10. // 获取HTTP响应。
  11. func (resp *Response) HttpResp() *http.Response {
  12. return resp.httpResp
  13. }
  14. // 获取深度值。
  15. func (resp *Response) Depth() uint32 {
  16. return resp.depth
  17. }

这个类型的声明应该不用我再做解释了。它的各个部分的含义与base.Request类型的基本一致。

除了请求和响应这两个有着对应关系的数据结构之外,我们还需要定义条目的结构。条目的实例需要储存的内容应该会比请求和响应复杂得多。因为对响应的内容进行筛选并生成出条目的规则也是由网络爬虫框架的使用者自己制定的。因此,条目的结构应该足够地灵活。其实例应该可以容纳所有有可能从响应内容中筛选出的数据。基于此,我们编写出了条目的类型声明:

  1. // 条目。
  2. type Item map[string]interface{}

我们把Item类型声明为字典类型map[string]interface{}的别名类型。这样就可以最大限度地存储多样的数据了。由于条目处理器也应该是由网络爬虫框架的使用者提供的,所以这里并不用考虑字典中的各个元素值是否可以被条目处理器正确理解的问题。

好了,我们需要的3个基本数据类型都在这里了。为了能够用一个类型从整体上标识这3个基本数据类型,我们又声明了base.Data接口类型:

  1. // 数据的接口。
  2. type Data interface {
  3. Valid() bool // 数据是否有效。
  4. }

这个接口类型只有一个名为Valid的方法。我们可以通过调用该方法来判断数据的有效性。显然,base.Data接口类型的作用更多的是作为某一类的类型的标签,而不是被用于定义这类类型的行为。为了让代表请求、响应或条目的类型都实现Data接口,我们又在当前的库源码文件中添加了这样几个方法:

  1. // 数据是否有效。
  2. func (req *Request) Valid() bool {
  3. return req.httpReq != nil && req.httpReq.URL != nil
  4. }
  5. // 数据是否有效。
  6. func (resp *Response) Valid() bool {
  7. return resp.httpResp != nil && resp.httpResp.Body != nil
  8. }
  9. // 数据是否有效。
  10. func (item Item) Valid() bool {
  11. return item != nil
  12. }

这3个方法分别使base.Request类型、base.Response类型和base.Item类型实现了base.Data接口类型。这3个类型因base.Data接口类型而被归为了一类。在后面的章节中,我们会了解到这样做还有另外的功效。

至此,实现网络爬虫框架需要用到的基本数据类型均已编写完成。不过,在这里我们还需要一个额外的类型。这个类型是作为error接口类型的实现类型而存在的。它的主要作用是封装爬取过程中出现的错误,并以统一的方式生成字符串形式的描述。我们已经知道,只要某个类型的方法集合中包含了下面这个方法就等于实现了error接口类型:

  1. func Error() string

为此,我们首先声明了一个名为CrawlerError接口类型:

  1. // 爬虫错误的接口。
  2. type CrawlerError interface {
  3. Type() ErrorType // 获得错误类型。
  4. Error() string // 获得错误提示信息。
  5. }

其中,Type方法的结果类型ErrorType只是一个string类型的别名类型而已。另外,由于CrawlerError类型的声明中也包含了Error方法,所以只要某个类型实现了它,就等于实现了error接口类型。先编写这样一个接口类型而不是直接编写出error接口类型的实现类型的原因有两个。第一,我们在编程过程中应该遵循面向接口编程的原则。关于此,我们已经提到过多次。第二是为了扩展error接口类型。读者应该已经熟知,网络爬虫框架拥有多个处理模块。错误类型值应该可以表明该错误是哪一个处理模块抛出的。这也是CrawlerError类型中的Type方法所起到的作用。

下面,就让我们来实现这个接口类型。遵照本书中对实现类型的命名风格,我们声明了结构体类型myCrawlerError

  1. // 爬虫错误的实现。
  2. type myCrawlerError struct {
  3. errType ErrorType // 错误类型。
  4. errMsg string // 错误提示信息。
  5. fullErrMsg string // 完整的错误提示信息。
  6. }

字段errMsg的值应该由初始化myCrawlerError类型值的一方给出。这与传递给errors.New函数的参数值的含义类似。作为附加信息,errType字段的值就应该是该类型的Type方法的结果值。它代表了错误类型。为了便于使用者为该字段赋值,我们还声明了一些常量:

  1. // 错误类型常量。
  2. const (
  3. DOWNLOADER_ERROR ErrorType = "Downloader Error"
  4. ANALYZER_ERROR ErrorType = "Analyzer Error"
  5. ITEM_PROCESSOR_ERROR ErrorType = "Item Processor Error"
  6. )

可以看到,这3个常量的类型都是base.ErrorType类型。它们分别与网络爬虫框架中的3个处理模块相对应。当某个处理模块在被执行的过程中出现了错误时,程序就会使用对应的base.ErrorType类型的常量来初始化一个base.CrawlerError类型的错误值。具体的初始化方法就是使用webcrawler/base代码包中的NewCrawlerError函数。其声明如下:

  1. // 创建一个新的爬虫错误。
  2. func NewCrawlerError(errType ErrorType, errMsg string) CrawlerError {
  3. return &myCrawlerError{errType: errType, errMsg: errMsg}
  4. }

这个名称以“New”为前缀的函数的作用是创建和初始化一个CrawlerError类型的值。从该函数的函数体中的代码上我们可以看出,myCrawlerError类型应该是CrawlerError类型的一个实现类型。myCrawlerError类型的方法集合中应该包含CrawlerError接口类型中的Type方法和Error方法:

  1. // 获得错误类型。
  2. func (ce *myCrawlerError) Type() ErrorType {
  3. return ce.errType
  4. }
  5. // 获得错误提示信息。
  6. func (ce *myCrawlerError) Error() string {
  7. if ce.fullErrMsg == "" {
  8. ce.genFullErrMsg()
  9. }
  10. return ce.fullErrMsg
  11. }

眼尖的读者可能已经发现,指针方法Error中用到了myCrawlerError类型的fullErrMsg字段。并且,它还调用了一个名为genFullErrMsg的方法。现在我们就来看看它们的作用。下面是genFullErrMsg方法的实现:

  1. // 生成错误提示信息,并给相应的字段赋值。
  2. func (ce *myCrawlerError) genFullErrMsg() {
  3. var buffer bytes.Buffer
  4. buffer.WriteString("Crawler Error: ")
  5. if ce.errType != "" {
  6. buffer.WriteString(string(ce.errType))
  7. buffer.WriteString(": ")
  8. }
  9. buffer.WriteString(ce.errMsg)
  10. ce.fullErrMsg = fmt.Sprintf("%s\n", buffer.String())
  11. return
  12. }

方法genFullErrMsg同样是myCrawlerError类型的指针方法。它的功能是生成Error方法需要返回的结果值。可以看到,我们没有直接使用myCrawlerError类型值的使用方提供的值(即那个会被赋给errMsg字段的值),而是以它为基础生成了一条更完整的错误提示信息。在这条信息中,明确显示出它是一个网络爬虫的错误,也给出了错误的类型和详情。注意,我们把这条错误提示信息缓存在了fullErrMsg字段中。回顾该类型的Error方法的实现,只有当fullErrMsg字段的值为""时才会调用genFullErrMsg方法,否则会直接把fullErrMsg字段的值作为Error方法的结果值返回。这也是为了避免频繁地拼接字符串给程序性能带来的负面影响。我们在genFullErrMsg方法的实现中使用了bytes.Buffer类型值来作为拼接错误信息的手段。虽然这样做确实可以大大减小这一负面影响,但是由于myCrawlerError类型的值是不可变的,所以缓存错误提示信息还是很有必要的。其根本原因是,对这样的不可变值的缓存永远不会失效。

我们在前面展示的这些类型对于承载数据(不论是正常数据还是错误信息)来说已经足够用了。它们是网络爬虫框架中的基本元素。

9.4.2 接口的设计

这里所说的接口是指网络爬虫框架中的各个模块以及中间件中的重要组件的接口。与我们先前描述的基本数据结构不同,它们的主要职责是定义模块和组件的行为。在定义行为的过程中,我们会对它们应有的功能作进一步的审视,同时也会更多地思考它们之间的协作方式。

下面,我们就开始逐一地设计网络爬虫框架中的这类接口。为了更易于理解,我们会先从那几个处理模块的接口开始,然后再去考虑怎样去定义将会控制和管理这些处理模块的调度器以及它会用到的各种组件的行为。

注意 本小节描述的部分接口会在9.8节中被修改。因此,它们会与随书项目中的相应代码不一致。

1. 网页下载器

网页下载器的功用就是从网络中的目标服务器上下载网页。一个网页在网络中的唯一标识是网络地址。但是,网络地址只能起到定位网页在网络中的位置的作用,而并不是成功下载网页的充分条件。

我们已经知道,HTTP协议是基于TCP/IP协议栈的应用层协议。它是互联网世界的根基之一。因此,在互联网时代诞生的绝大多数语言都会使用不同的方式提供针对该协议的API。当然,Go语言也不例外。Go语言的标准库代码包net/http就提供了这些API。实际上,我们在编写网络爬虫框架的基本数据结构的时候,就使用到了其中的两个类型http.Requesthttp.Response。并且,不夸张地说,我们将要构建的网络爬虫框架就是以HTTP协议和net/http代码包中的API为基础的。

从网页下载器充当的角色来讲,它的功能只有两个:发送请求并接收响应。因此,我们可以设计出这样一个声明:

  1. func Download(req base.Request) (*base.Response, error)

函数Download的签名完全体现出了网络下载器应有的功能。不过网络下载器的接口不应只包含这一个声明。因为,我们已经明确表示过,网络爬虫框架应该同时使用若干个网页下载器来提供网页下载能力。进一步讲,这些网页下载器应该被放置于一个网页下载器池中。因此,网页下载器的实例还应该使用某种方式唯一地标识自己。这样的好处很多,至少有助于我们鉴别池中的不同网页下载器实例。

综上所述,我们已经可以给出网页下载器的接口类型声明了:

  1. // 网页下载器的接口类型。
  2. type PageDownloader interface {
  3. Id() uint32 // 获得ID。
  4. Download(req base.Request) (*base.Response, error) // 根据请求下载网页并返回响应。
  5. }

其中,方法Id会把当前的网页下载器实例的唯一标识“ID”作为结果值返回给方法调用方。在一个基于网络爬虫框架的应用程序的实例范围之内,一个ID应该足以唯一地标示一个网页下载器实例。说到这里,我们不得不提到作为中间件中的一个组件的工具——ID生成器。在这里,我们并不会讨论ID生成器的具体实现,而只会给出它的接口类型:

  1. // ID生成器的接口类型。
  2. type IdGenerator interface {
  3. GetUint32() uint32 // 获得一个uint32类型的ID。
  4. }

一个可以有将近43亿(2的32次方,即4 294 967 296)的取值的类型已经完全够用了。这肯定能够满足我们构建一个足够大的网页下载器池的需求。至少对现阶段来说是这样。因此,我们只为接口类型IdGenerator添加一个方法——GetUint32IdGenerator接口类型的实现类型应该保证:在GetUint32方法的调用次数超过2的32次方之前,每次调用所得到的数值都是不同的。并且,为了使之更加通用,还应该考虑到:在该调用次数超出uint32类型的取值范围之后,GetUint32方法返回的数值又该是怎样的。读者也可以思考一下这个问题,甚至试着编写出一个实现此接口的ID生成器。有了之前我们讲到的那些知识,这应该并不困难。要记得,ID生成器中的所有方法都应该是并发安全的。

在前面,我们多次提到了网页下载器池。那么这个池的功能定义又是怎样的呢?首先,我们应该可以在需要时从该池中取出一个网页下载器。其次,在我们完成对所持网页下载器的使用之后还可以将其归还给该池。最后,我们应该随时能了解到该池的使用状况。据此,我们有了这样一个接口类型声明:

  1. // 网页下载器池的接口类型。
  2. type PageDownloaderPool interface {
  3. Take() (PageDownloader, error) // 从池中取出一个网页下载器。
  4. Return(dl PageDownloader) error // 把一个网页下载器归还给池。
  5. Total() uint32 // 获得池的总容量。
  6. Used() uint32 // 获得正在被使用的网页下载器的数量。
  7. }

该接口声明中的注释很清楚地描述了其中每个方法的功能。其中,值得注意的是Total方法。我们在前面并没有明确网页下载器的总容量是否可变。Total方法的结果值会体现出该池总容量的可变性。如果池的总容量可变,那么在不同时刻调用该方法所得到的数值就可能会有所不同。

显然,实现总容量固定的池会比较容易。我们在后面编写池的实现的时候会专门讨论这个问题,并在功能的丰满与程序的复杂度之间进行权衡。

与网页下载器有关的所有接口的声明都被放到了goc2p项目的webcrawler/downloader代码包中。

2. 分析器

分析器的职责是根据给定的规则分析响应,并筛选出请求或条目。同时,它也应该能在单个的基于网络爬虫框架的应用程序中唯一地标识自己。下面是代表了分析器行为的接口类型的声明:

  1. // 分析器的接口类型。
  2. type Analyzer interface {
  3. Id() uint32 // 获得ID。
  4. Analyze(
  5. respParsers []ParseResponse,
  6. resp base.Response) ([]base.Data, []error) // 根据规则分析响应并返回请求和条目。
  7. }

接口类型Analyzer与网页下载器接口的声明风格非常类似。不过,作为提高可扩展性的努力的一部分,其中的Analyze方法还要接受一个很重要的参数。它就是元素类型为ParseResponse的切片respParsers

类型ParseResponse是一个函数类型。它的声明如下:

  1. // 被用于解析HTTP响应的函数类型。
  2. type ParseResponse func(httpResp *http.Response, respDepth uint32) ([]base.Data,
  3. []error)

声明这样一个函数类型的意义是让网络爬虫框架的使用者自定义响应分析规则,以及生成相应的请求和条目的方式。该函数类型的参数httpResp代表了目标服务器返回的HTTP响应,而参数respDepth则代表了该响应的深度。这些参数可以让函数实例(也就是符合此类型声明的函数)获取到执行分析和生成过程所需的各种信息。另一方面,它的结果声明列表表明,该函数类型的实例需要把经分析和筛选而生成的若干请求和条目作为结果返回。同时,如果在这个过程中发生了任何错误也要如实地上报。

可以看到,我们把整个的分析、筛选和生成的过程都让使用者自行定义,而不是只让其提供规则。这样做鉴于以下几个原因。

  • 对内容的分析和筛选往往与数据的生成方式之间有很强的关联性。若把这几个过程分离开来并由两方分别定义,必然会造成很多不必要的中间数据创建和传递。这样不但会使我们编写出不少冗余的代码、增加相应模块的复杂度,还会使网络爬虫框架的一些API(比如ParseResponse类型)变得不清晰和臃肿。

  • 对响应分析和其内容筛选的规则的制定是可以有据可依的。因为有一些标准是专门针对于此的。读者可参看W3C网站上关于XPath或XQuery的描述。我们可以直接遵循某一个标准而形成分析和筛选规则的制定方法。所以,把这项任务抛出去并不会给网路爬虫框架的使用者带来太多的不便。另外,在Go语言的标准库中并没有提供实现此类标准(如XPath或XQuery)或便于进行此项任务的代码包。虽然使用第三方的代码包当然是可行的,但是我并不想把此类第三方库作为网络爬虫框架的实现方案的一部分。这样无益于突出重点。况且,有些需要去访问的新的网络地址并不是直接包含在响应的内容中的,而是由相关的JavaScript程序动态生成的。这样的情况过于复杂和多样了。在现阶段,我们把对这些情况的处理留给作为需求方的网络爬虫框架的使用者应该会更好。

  • 如果读者对HTTP协议有一定的了解,那么就一定会知道:对于HTTP的请求来说,并不是仅仅需要填充请求主体那么简单。对于请求的头部信息,我们可能需要进行更加灵活多变的设置,包括但不限于对连接方式、缓存机制、字符编码集、Cookie以及授权证书的设定。这些设定往往会根据目标网站、访问身份以及响应的具体内容等因素的不同而不同。因此,如果网络爬虫框架把新请求的生成操作放在其内部,那么这种灵活的设置将很难进行,甚至会导致无法满足使用者的正常需求。当然,我们可以设计更加复杂的ParseResponse函数类型,甚至使用更加重量级的附带若干方法的类型来表现它。但是,从设计和编写该框架的目的来看,我们并不需要把它设计得如此复杂。

总之,在实现这类框架的初期,我们需要把它设计得简单明了一些,并且以可用性作为首要目标。在这之后,我们可以在改进和演化框架的过程中适当地增加它的功能,并以此让它可以适用于更多的情况。增加一些开箱即用的额外工具会是一个不错的选择。它们可以让使用者在编写针对框架的定制代码的时候减少一些工作量。

好了,让我们回到分析器的接口设计中来。通过Analyzer接口类型的Analyze方法,网络爬虫框架会拿到若干数据(请求或条目)实例和错误实例。这些实例会通过中间件中的工具被递送到不同的模块中。

对于允许有多个实例同时存在的分析器来说,我们也需要用一个池来管理它们。分析器池的功能需求与前面讲述的网页下载器池的功能需求完全一致。前者的声明如下:

  1. // 分析器池的接口类型。
  2. type AnalyzerPool interface {
  3. Take() (Analyzer, error) // 从池中取出一个分析器。
  4. Return(analyzer Analyzer) error // 把一个分析器归还给池。
  5. Total() uint32 // 获得池的总容量。
  6. Used() uint32 // 获得正在被使用的分析器的数量。
  7. }

有些读者可能会思考:我们是否可以统一一下这两个池的接口呢?对于Go语言这样不支持自定义泛型的编程语言来说,我们怎样做才能够达到统一的目的?读者可以先考虑一下这两个问题,我们会在后面再次提到它。

与分析器有关的所有接口的声明都放到了goc2p项目的webcrawler/analyzer代码包中。

3. 条目处理管道

顾名思义,条目处理管道的功能就是为条目的处理提供环境,并控制整体的处理流程。具体的处理的步骤由网络爬虫框架的使用者提供。实现单一处理步骤的程序称为条目处理器。这样的设计可以让网络爬虫框架与具体的条目处理步骤分离开来,同时又不至于丧失控制权。下面我们来看看条目处理管道的接口类型:

  1. // 条目处理管道的接口类型。
  2. type ItemPipeline interface {
  3. // 发送条目。
  4. Send(item base.Item) []error
  5. // FailFast方法会返回一个布尔值。该值表示当前的条目处理管道是否是快速失败的。
  6. // 这里的快速失败是指:只要对某个条目的处理流程在某一个步骤上出错,
  7. // 那么条目处理管道就会忽略掉后续的所有处理步骤并报告错误。
  8. FailFast() bool
  9. // 设置是否快速失败。
  10. SetFailFast(failFast bool)
  11. // 获得已发送、已接受和已处理的条目的计数值。
  12. // 更确切地说,作为结果值的切片总会有3个元素值。这3个值会分别代表前述的3个计数。
  13. Count() []uint64
  14. // 获取正在被处理的条目的数量。
  15. ProcessingNumber() uint64
  16. // 获取摘要信息。
  17. Summary() string
  18. }

该接口类型中最重要的方法就是Send方法。该方法使条目处理管道的使用方可以向它发送条目,以使其中的条目处理器对这些条目进行处理。FailFast方法和SetFailFast对应于条目处理管道的“快速失败”特性。方法的注释对这一特性已有清晰的描述。最后,添加方法CountProcessingNumberSummary更多的是出于程序监控的考虑。我们在后面会看到,Summary方法还会出现在更多接口类型的声明中。

细心的读者可能已经发现,接口类型ItemPipeline中的方法并没有体现出如何设置条目处理器序列。实际上,设计该接口类型的一个很重要的思路是:不能对条目处理管道中的条目处理器序列进行变更。换句话说,我们要求针对条目的处理流程在总体上是不可变的。这种不可变性会使该流程以及对它的控制保持简单。

既然不能改变条目处理器序列,那么我们只能在初始化条目处理管道的时候对它进行设置。由于这个序列的长度是不定的,所以我们需要用一个切片类型的值来代表它。该切片值的类型是[]ProcessItem。类型ProcessItem即是被用来代表条目处理器的类型。其声明如下:

  1. // 被用来处理条目的函数类型。
  2. type ProcessItem func(item base.Item) (result base.Item, err error)

函数类型ProcessItem接受一个需要被处理的条目,并把被处理后的条目和可能发生的错误作为结果值返回。如果第二个结果值不为nil,则说明在这个处理的过程中发生了一个错误。

我们使用一个函数类型来代表条目处理器的原因是:单一的函数往往可以被安全地并发执行,除非它使用到了某类共享资源。相比之下,实现一个接口类型的方式太多。作为框架,我们几乎无法给出并发安全方面的强制性约束。然而,对于条目处理器来说,保证并发安全性又是非常重要的。所有被送入条目处理管道的条目都会被其中的条目处理器并发地处理。这就让我们不得不尽量对条目处理器的实现进行约束。

由于条目处理管道在用途和设计上不同于网页下载器和分析器,并且有了那个可以使条目处理器的并发安全性基本上得到保障的ProcessItem函数类型,所以我们并不需要所谓的条目处理管道池。网络爬虫框架仅持有条目处理管道的一个实例就足够了。

与条目处理管道有关的所有接口的声明都放到了goc2p项目的webcrawler/itempipeline代码包中。

4. 调度器

调度器属于控制模块而非处理模块。它需要对各个处理模块的运作进行检测和控制。可以说,调度器是网络爬虫框架的核心。正因为如此,我们需要让其提供相应的启动和停止爬取流程的方法。除此之外,出于监控整个流程的目的,我们还应该在这里为使用方提供一些获取实时状态或统计信息的方法。依照这样的思路,我们有了这样一个接口类型声明:

  1. // 调度器的接口类型。
  2. type Scheduler interface {
  3. // 启动调度器。
  4. // 调用该方法会使调度器创建和初始化各个组件。在此之后,调度器会激活爬取流程的执行。
  5. // 参数channelLen被用来指定数据传输通道的长度。
  6. // 参数poolSize被用来设定网页下载器池和分析器池的容量。
  7. // 参数crawlDepth代表了需要被爬取的网页的最大深度值。深度大于此值的网页会被忽略。
  8. // 参数httpClientGenerator代表的是被用来生成HTTP客户端的函数。
  9. // 参数respParsers的值应为分析器所需的被用来解析HTTP响应的函数的序列。
  10. // 参数itemProcessors的值应为需要被置入条目处理管道中的条目处理器的序列。
  11. // 参数firstHttpReq即代表首次请求。调度器会以此为起始点开始执行爬取流程。
  12. Start(channelLen uint,
  13. poolSize uint32,
  14. crawlDepth uint32,
  15. httpClientGenerator GenHttpClient,
  16. respParsers []anlz.ParseResponse,
  17. itemProcessors []ipl.ProcessItem,
  18. firstHttpReq *http.Request) (err error)
  19. // 调用该方法会停止调度器的运行。所有处理模块执行的流程都会被中止。
  20. Stop() bool
  21. // 判断调度器是否正在运行。
  22. Running() bool
  23. // 获得错误通道。调度器以及各个处理模块运行过程中出现的所有错误都会被发送到该通道。
  24. // 若该方法的结果值为nil,则说明错误通道不可用或调度器已被停止。
  25. ErrorChan() <-chan error
  26. // 判断所有处理模块是否都处于空闲状态。
  27. Idle() bool
  28. // 获取摘要信息。
  29. Summary(prefix string) SchedSummary
  30. }

接口类型SchedulerStart方法的作用就是启动调度器。它接受的参数并不少,共有7个。前6个参数都是被用于初始化网络爬虫框架中的各个组件的。而最后一个参数firstHttpReq则是我们在前面提到的首次请求。爬取流程的执行是由它来激活的。它会让网络爬虫框架对首个以及后续网页的下载、分析和处理流程真正地运转起来。

请读者注意Start方法的参数声明列表中的那个名为httpClientGenerator参数。该参数的类型是GenHttpClient。该类型是一个函数类型,其声明如下:

  1. // 被用来生成HTTP客户端的函数类型。
  2. type GenHttpClient func() *http.Client

可以看到,该类函数会生成HTTP客户端的实例。HTTP客户端实例是HTTP请求的发送操作和HTTP响应的接收操作的执行者,是网页下载器必备的组件。有了这个函数类型,我们就可以让网络爬虫框架的使用者自己决定怎样创建和初始化HTTP客户端实例。由于http.Client类型的所有字段都是公开的,所以我们可以在这样的函数中生成定制化程度很高的HTTP客户端实例。当然,为了保证网页下载子流程的正确执行,网络爬虫框架也应当提供一个默认的HTTP客户端实例生成方法(即GenHttpClient函数类型的默认实现)。

接口类型SchedulerStop方式的作用是停止调度器以及各个处理模块的运行。停止调度器是比较容易的,但是中止正在运行的各个处理模块就相对复杂了。这一功能的实现既需要做到统一,又应该充分分散。前者是说需要有统一的停止信号来标识整个流程的停止。后者指的是所有需要被停止的子流程都需要关注该信号并及时做出响应。这样才能保证爬取流程中的各个环节的状态的一致性。因此,停止操作的实现几乎涉及了网络爬虫框架的所有组件。我们在实现它的时候会进行详细的讨论。读者也可以先思考一下这一问题的解决方案。

有的读者可能混淆Scheduler接口类型的Running方法和Idle方法的功能。实际上,它们是在不同层次上了解爬取流程执行情况的方法。Running方法仅仅会返回一个代表了当前调度器是否正在运行(已被启动且还未被停止)的bool类型值。而Idle方法则被设计为可以深度检测网络爬虫框架的运行状况的方法。也就是说,它应该对爬取流程的执行情况进行更深层次的了解,比如,检测各个池的使用情况和条目处理管道的内部运作情况。Idle方法应该依据这些信息进行最终的判定,并把判定结果返回给该方法的调用方。请读者回顾一下,在前面讲到的那些处理模块的接口中,哪些方法能够为Idle方法的判定提供依据?

再来说Scheduler接口类型的ErrorChan方法。我们可以调用该方法以获得一个元素类型为error的接收通道。关于这个接收通道的创建过程和其中的元素值的来源,我们会稍后在讲解中间件的相关接口的时候予以介绍。在这里,我们只需要知道从这个接收通道中可以接收到在执行爬取流程的过程中发生的错误的值即可。

最后,Scheduler接口类型的Summary方法有着与ItemPipeline接口类型的Summary基本一致的功能。但不同的是,后者返回的是一个string类型值,而前者返回的则是一个SchedSummary类型的值。SchedSummary类型是专门被用来提供调度器的摘要信息的类型。该类型的声明如下:

  1. // 调度器摘要信息的接口类型。
  2. type SchedSummary interface {
  3. String() string // 获得摘要信息的一般表示。
  4. Detail() string // 获取摘要信息的详细表示。
  5. Same(other SchedSummary) bool // 判断是否与另一份摘要信息相同。
  6. }

其中,String方法是标准的被用来返回类型实例的字符串表示形式的方法。而Detail方法则可以被看作是String方法的增强版。它返回的字符串中会包含更加详细的信息。最后,Same方法被用来判断当前的类型实例与另一个是否相同。在后面我们会看到,这个方法可以为判断打印调度器摘要信息的必要性提供依据。

以上就是所有的与调度器有关的公开的接口。虽然调度器的行为定义看起来如此简单,但实现这些行为却是需要进行很多考量的。相比接口,调度器的实现更加体现了我在设计网络爬虫框架的调度和监控等功能方面的思考。在调度器的实现中,我们使用了很多被称为网络爬虫框架的中间件的工具。从这些工具的设计上,读者也可以更加清晰地了解到网络爬虫框架的运作模式。我们马上就会讲到它们。

与调度器有关的所有接口的声明与它们的实现一起都放在了goc2p项目的webcrawler/scheduler代码包中。不过,其中使用到的中间件工具处于另一个代码包中。

5. 中间件接口概述

从上一节展示的图9-2中,我们可以看到,网络爬虫框架的中间件起到了承上启下的作用。其中的工具都是调度器对各个处理模块进行调度和监控的有力辅助。具体来讲,这些工具包括:ID生成器、通道管理器、实体池和停止信号。

我们在前面讲述网页下载器的接口类型的时候,已经对ID生成器的接口类型IdGenerator进行过介绍。下面我们重点描述与其他3个工具相关的接口类型。

6. 通道管理器

调度器的职责之一就是在各个处理模块之间传递数据。关于各类型数据的流转方向,读者可以回顾9.2节中展示的图9-1。通过对Go语言的并发编程方式的了解,我们实现这样的功能的时候,肯定会首先想到通道(Channel)类型。实际上,使用通道在各个处理模块之间传递数据是一个最佳方案。其原因不言而喻。由于需要传递的数据有多个种类,我们需要的通道也会有不同的类型。更具体地说,我们会用到的通道的类型共用4个,它们分别对应了请求、响应、条目和错误这4类数据。

通道管理器正是被用来管理上述4类通道的工具。通道管理器只会关心并管理通道,而不会关心网络爬虫框架中的任何流程的执行过程。这从它的接口类型声明上也可以看得出来:

  1. // 通道管理器的接口类型。
  2. type ChannelManager interface {
  3. // 初始化通道管理器。
  4. // 参数channelLen代表通道管理器中的各类通道的初始长度。
  5. // 参数reset指明是否重新初始化通道管理器。
  6. Init(channelLen uint, reset bool) bool
  7. // 关闭通道管理器。
  8. Close() bool
  9. // 获取请求传输通道。
  10. ReqChan() (chan base.Request, error)
  11. // 获取响应传输通道。
  12. RespChan() (chan base.Response, error)
  13. // 获取条目传输通道。
  14. ItemChan() (chan base.Item, error)
  15. // 获取错误传输通道。
  16. ErrorChan() (chan error, error)
  17. // 获取通道长度值。
  18. ChannelLen() uint
  19. // 获取通道管理器的状态。
  20. Status() ChannelManagerStatus
  21. // 获取摘要信息。
  22. Summary() string
  23. }

我们可以看到,接口类型ChannelManager中的大部分方法的功能都是获取某类实例或信息。其中有4个方法是分别被用来获取通道管理器中的某一个类型的通道的。这4个方法的结果声明列表中除了相应的通道类型之外,还都包含了一个error类型。这第二个结果的值在哪些情况下会是非nil的呢?读者可以在它们的实现中找到答案。

接口类型ChannelManager的方法Init的功能是对当前的通道管理器进行初始化。在对其初始化的过程中,我们势必要同时对其容纳的各类通道进行初始化。这就需要我们给出这些通道的初始长度。该初始长度由参数channelLen代表。另外,参数reset使得通道管理器可以被强行地初始化,即使它已经被初始化过。这样,我们就从接口层面上允许了对通道管理器中的各类通道及其长度的重新设定。

通道管理器的Close方法的作用是关闭通道管理器以及其中的所有通道实例。如果发现该通道管理器已被关闭,那么该方法应该立即返回并将false作为结果值。

我们应该专门设计出一套针对通道管理器的状态标识,以便在其内部和外部都能够实时、清晰地了解到它的状况。为此,就有了这样一个类型以表示通道管理器状态的值:

  1. // 被用来表示通道管理器的状态的类型。
  2. type ChannelManagerStatus uint8

对于ChannelManagerStatus类型的值,我们预设了3个:

  1. const (
  2. CHANNEL_MANAGER_STATUS_UNINITIALIZED ChannelManagerStatus = 0 // 未初始化状态。
  3. CHANNEL_MANAGER_STATUS_INITIALIZED ChannelManagerStatus = 1 // 已初始化状态。
  4. CHANNEL_MANAGER_STATUS_CLOSED ChannelManagerStatus = 2 // 已关闭状态。
  5. )

这3个常量分别代表了通道管理器的不同状态。CHANNEL_MANAGER_STATUS_UNINITIALIZED表示通道管理器还未被初始化。只要我们没有调用过通道管理器的Init方法,它就会处于这种状态。与之对应,我们对Init方法的初次调用会使该通道管理器的状态转变为CHANNEL_MANAGER_STATUS_INITIALIZED。在这之后,我们再次调用它的Init方法就并不一定会使之状态发生改变了。这由通道管理器当时的状态以及我们传递给该方法的第二个参数值决定。最后,如果通道管理器已被初始化,那么调用它的Close方法必定会使其状态转变为CHANNEL_MANAGER_STATUS_CLOSED

除了通道管理器的接口类型所表示出的功能需求之外,我们要求它的方法InitClose以及其他不会改变通道管理器内部状态的方法都应该是并发安全的。这就需要我们在实现它们的时候考虑使用适合的同步工具加以保证。关于这一方面的内容,我们以后再说。

顺便提一下,与网络爬虫框架的中间件有关的所有声明代码分别存放到了goc2p项目的webcrawler/middleware代码包的各个源码文件中。这包括我们稍后会讲到的实体池和停止信号。

7. 实体池

我们已经知道,针对网页下载器池和分析器池的接口之间非常地相似。这也意味着这两者在行为和功能上基本一致。显然,出于避免重复代码和抽象共有特性的目的,我们应该编写一个可以被公用的池。由于池中一般会存放若干个同一个类型的实体(在Go语言中,称为值),因此我们给予它一个专有名称——实体池。下面是为它声明的接口类型:

  1. // 实体池的接口类型。
  2. type Pool interface {
  3. Take() (Entity, error) // 取出实体。
  4. Return(entity Entity) error // 归还实体。
  5. Total() uint32 // 实体池的容量。
  6. Used() uint32 // 实体池中已被使用的实体的数量。
  7. }

该类型中的4个方法的声明与之前讲到的那两个池的基本一致。不过,其中的Entity类型是我们专门为它声明的。

作为一个可以被公用的池,我们应该尽量允许任意类型的元素(当然,一个实体池中的所有元素的类型应该是一致的)。又由于Go语言并不支持自定义泛型,所以我们首先想到的是使用interface{}类型作为相关方法的参数或结果的类型:

  1. Take() (interface{}, error)
  2. Return(interface{} Entity) error

我们都知道,空接口类型interface{}的变量可以被赋予任何类型的值。不过,我们还是希望池中的每个元素值都能够提供唯一标识自己的方法。不知读者是否还记得,我们刚刚讲过接口类型PageDownloaderAnalyzer都包含了这样一个方法:

  1. Id() uint32 // 获得ID。

这两个类型的值都会以此种方式唯一地标识自己。既然我们主要会用实体池来存放网页下载器或分析器,那么我们就依照它们的ID获取方法来声明Entity接口类型:

  1. // 实体的接口类型。
  2. type Entity interface {
  3. Id() uint32 // ID的获取方法。
  4. }

这样一来,我们在不变动之前编写好的PageDownloader类型和Analyzer类型的声明的情况下就可以让它们成为Entity的扩展接口类型了。

这里预先透露一点实现细节。对于实体池的实现,我们可以参考之前讲过的Goroutine票池。读者可以先思考一下怎样去做。另外,与通道管理器一样,实体池中的各个方法也应该具有并发安全性。

8. 停止信号

我们在第7章讲述过一个载荷发生器的实现过程。在代表载荷发生器实现的那个结构体类型中有这样一个字段声明:

  1. stopSign chan byte // 停止信号的传递通道。

字段stopSign是一个简单明了的停止信号传输通道。正因为有了这样一个通道,针对载荷发生器的停止流程才得以及时、正确地执行。

不过,我们在网络爬虫框架中使用如此简单的结构来传递停止信号是不合适的。主要原因是,停止信号的发送者应该并不需要关心会有多少个接收者。如果使用上述通道来传递停止信号,那么发送者是无法确定应该向该通道发送几个停止信号的。即使我们在实现了所有处理模块之后可以将这个数量固定下来,也会为今后可能的代码修改埋下隐患。换句话说,一旦我们修改了某个或某些处理模块中被用来处理停止信号的代码,就可能需要重新计算发送方应该发送的停止信号的数量。只要这个数量出现偏差,就非常有可能会使网络爬虫框架在执行停止流程的时候出现问题。因此,我们需要建立一个“一方发送、多方接收”的灵活的停止信号传递机制。这是出于今后对网络爬虫框架扩展的考虑。同时,这样做也可以让我们在编写各种处理停止信号的代码的时候不受任何约束。

为了建立一个灵活的停止信号传递机制,我们编写了下面这个接口类型声明:

  1. // 停止信号的接口类型。
  2. type StopSign interface {
  3. // 置位停止信号。相当于发出停止信号。
  4. // 如果先前已发出过停止信号,那么该方法会返回false。
  5. Sign() bool
  6. // 判断停止信号是否已被发出。
  7. Signed() bool
  8. // 重置停止信号。相当于收回停止信号,并清除所有的停止信号处理记录。
  9. Reset()
  10. // 处理停止信号。
  11. // 参数code应该代表停止信号处理方的代号。该代号会出现在停止信号的处理记录中。
  12. Deal(code string)
  13. // 获取某一个停止信号处理方的处理计数。该处理计数会从相应的停止信号处理记录中获得。
  14. DealCount(code string) uint32
  15. // 获取停止信号被处理的总计数。
  16. DealTotal() uint32
  17. // 获取摘要信息。其中应该包含所有的停止信号处理记录。
  18. Summary() string
  19. }

接口类型StopSign共包含了7个方法。先说前3个方法。停止信号的发出方调用Sign方法以发出信号。当然,这个发出的动作不一定会成功。因为,如果Signed方法的返回值为true,那么就说明停止信号已被发出。在这之后的重复的停止信号发出动作都不会成功。如果发出方想强行地再次发出停止信号,那么只能先调用Reset方法以重置停止信号,而后再次调用Sign方法。注意,Reset方法不但会重置停止信号本身,还会清除此前所有的停止信号处理记录。它的功能相当于对其所属的StopSign类型值重新进行初始化。

方法SignReset是专供给停止信号的发出方使用的。而Signed方法则可以被任何一方使用。停止信号的处理方可以适时地通过调用Signed方法以判断信号是否已被发出。如果停止信号已发出,那么处理方就需要及时停止当前流程并进行适当的善后处理。在处理完成后,停止信号的处理方应该调用该停止信号的Deal方法,以表示已经对该信号处理完毕。这样做完全是出于统计和监控的目的。注意,Deal方法应该只在停止信号已被发出且未被重置的情况下才会进行相应的操作。而在其他情况下,此操作应该被忽略。

从调度器的职责以及它与各个处理模块之间的关系来看,内部的停止信号理应由调度器发出。接口类型StopSign的声明中的最后3个方法会对调度器执行监控处理模块的任务有很大帮助。方法DealCountDealTotal分别被用来获取单一处理方或所有处理方对当前停止信号的处理计数。而Summary方法则可以生成并返回一个综合性的摘要信息,其中包含了全部的处理计数信息。

我们会在讲述StopSign接口类型的实现类型的时候看到,网络爬虫框架中的停止信号处理机制与先前的载荷发生器在这方面的实现方式截然不同。它们使用的被用来保证并发安全的方法也互不相干。这充分说明了,没有任何一个程序设计方案可以被称为银弹。即使对于像通道这样的并发编程利器来说也是这样。

在本小节,我们讲述了网络爬虫框架中的所有公开的接口类型。这些接口类型共同描绘出了网络爬虫框架的总体架构和行为模式。不过,我们在实现这些接口类型的时候仍然会有很多工作要做。在这之中也会涉及对各种设计方案的抉择和取舍。