7.3 实战演练——载荷发生器

我们用上一章和本章的大部分篇幅讲解了当今主流的并发编程方式,以及Go语言并发编程模型的来龙去脉。现在,终于到了利用这些知识动手编写一个完整的、可以实际应用的并发程序的时候了!

作为经历过全周期的软件项目(尤其是互联网软件项目)的开发者而言,肯定不止一次地有过这样的需求:在开发完成一个可运行的软件并且通过基本的功能测试之后,我们会非常急迫地想获得这个软件的性能数值。换句话说,我们总是需要尽早地对软件进行性能评测。之所以有这样的需求,是因为我们在正式使用该软件之前往往需要搞清楚下面这几个问题。

  • 这个软件到底能跑多快?

  • 在高负载的情况下,该软件是否还能保持正确性。或者说,载荷数量与软件正确性之间的关系是怎样的?

  • 在保证一定的正确性的条件下,该软件的可伸缩性是怎样的?

  • 在软件可以正常工作的情况下,负载与系统资源(包括CPU、内存和各种I/O资源,等等)使用率之间的关系是怎样的?

只要为这些问题找到了答案,我们就能够真正地了解到软件的性能。也只有这样,我们才会知道需要进行怎样的软件设计,以及提供怎样的系统资源,才能够让它在承受一定量的载荷的同时保证正确性。这也是我们分析并定位软件的性能瓶颈所需的重要参考资料。其中,这个载荷的量应该是我们在对性能评测所得到的一系列数值进行统计和分析后得出的。通过对这些数值的掌握,我们也可以了解到软件在给定的运行环境下的性能。而正确性的比率则应该体现(或者说满足)软件使用者(客户端软件的开发者或者终端用户)对软件的刚性需求。所谓刚性需求,就是关乎软件质量和使用者体验的硬性指标,是软件必须要满足的需求。

我们在本节将要编写的载荷发生器可以被作为软件性能评测的辅助工具。它可以向被测软件发送指定量的载荷,并记录下被测软件处理载荷的结果。这样,我们就可以统计出被测软件在给定的使用场景下的性能数值了。我们编写的载荷发生器应该具有优良的可控性和可扩展性,同时还应该能够输出内含丰富的结果。为了做到这几点,我们应该首先对它的输入、输出和基本结构进行一番设计。

7.3.1 参数和结果

一个程序的输入、输出以及二者之间的对应关系往往可以很好地体现出该程序的功能。在本小节,我们就从这方面着手对载荷发生器进行设计。

1. 重要的参数

为了编写这样一个工具,我们需要先了解一下几个必须且重要的参数。这些参数可以帮助我们营造出一个有利于软件性能评测的负载环境。

首先,一个软件在给定的运行环境下最多能够被多少个用户同时使用总是我们需要获悉的。这就是说,我们在进行性能测试的时候一般需要给定在同一时刻(或在某一个时间段内)向软件发送载荷的数量。在这一方面存在两个专业术语,它们是QPS(Query Per Second,每秒查询量)和TPS(Transactions Per Second,每秒事务处理量)。这两者都是体现服务器软件的性能的指标,其含义都是在一秒钟之内可以正常响应请求的数量的平均值。不同的是,前者针对的是对服务器上的数据的读取操作,而后者针对的则是对服务器上的数据的写入和修改操作。由于载荷的多样性,我们不打算在载荷发生器中区分这两个性能指标。但是,作为载荷发生器的使用者需要明确,在针对软件的某个种类的API进行测试的时候,得出的结果对应的应该是哪一个性能指标。

我们在这里可以把载荷和请求归为同一个事物。它们都代表了软件使用者为了获得某种结果而向为之服务的软件发送的一段数据。我们把每秒发送的载荷的数量(以下简称每秒载荷量)作为参数,其意义是控制载荷发生器向软件发送载荷(或称请求)的频率。这样,我们就可以控制被测软件在一段时间之内的负载情况了。

其次,软件在承受一定量的载荷的情况下对系统资源的消耗也是应该值得我们特别关注的。这与软件的可靠性息息相关。打个比方来说,有两个服务器软件A和B。经性能评测,A的QPS是2000,B的QPS是2200。但是由于B对系统资源的消耗较大以及对系统资源释放的不及时,导致其在接受每秒2000个载荷并持续了200个小时之后宕机了。但是A在接受同样的负载的情况下可以无故障地运行300小时。我们说,虽然B的部分性能数值更佳,但是其可靠性应该是比A差的。虽然实际的软件可靠性还需要通过一些专门的方法去衡量并且与软件的实时性能并没有直接的关系,但是我们还是应该积极地了解软件在持续接受一定量的载荷的情况下能够无差错地运行多久(或者说它可以持续地为并发的请求服务多长时间)。通过明确的设定持续发送载荷的时间(也可称为负载持续时间),我们就可以了解到这个时间段内软件性能的变化,同时也有机会使用一些方法获得软件对系统资源的使用情况,并以此推断出软件对各种系统资源的依赖情况以及它们与软件性能之间的关系。这也有助于我们查找软件内部可能存在的设计缺陷。

我们需要了解的第三个参数是评判软件正确性的一个重要标准。这个参数就是载荷的处理超时时间(以下简称处理超时时间),即我们可接受的从向软件发出请求到接收到软件返回的响应的最大耗时。设置处理超时时间可以让我们更加精确地计算出在给定载荷量以及持续时间的情况下软件的正确性比率。处理超时与软件处理载荷出错和响应内容错误一样,也表示了软件的不正确。同时,它也应该是我们关心的软件性能指标之一。它与载荷的量和持续时间之间都存在着一定的关联。例如,在我曾经所属的互联网软件开发团队中有这样一条硬性的软件系统性能要求:对于面向终端用户的所有API,其处理超时时间都是200毫秒。如果某个API在承受不高于最高载荷量的80%的负载的情况下造成了处理超时,那么这个API在性能上就是不合格的。这就强迫我们在各个方面(包括但不限于API设计、处理流程设计和数据缓存设计)都要仔细斟酌。比如,若某API持续承受高负载的时间比例过大(如一天中有12个小时连续承受着较高负载),那么我们就应该考虑该API的设计是否合理、软件系统是否需要再被拆分,甚至与之关联的其他系统是否存在问题。

我们在初始化载荷发生器的时候就应该给定上述3个参数,即每秒载荷量、负载持续时间和处理超时时间。载荷发生器应该根据这些参数自行计算出载荷发生以及发送的频率,并控制好并发量。

2. 输出的结果

载荷发生器的输出应该有助于我们统计、分析和汇总出软件在承受给定负载的情况下所表现出的各个性能数值,以及像软件可以承受的最大载荷量以及它可以持续承受一定载荷量的最长时间这样的极限值。据此,我们应该在针对某一个载荷的结果中至少包含三块内容,即请求和响应的内容、响应的状态以及请求(或者说载荷)处理耗时。其中,请求和响应的内容让我们可以精细地检查响应内容的正确性。响应的状态则应该反映出处理此请求的过程中的绝大多数问题,而不仅仅是成功或失败那么简单。至于请求处理耗时,则需要真实地体现从向软件发送请求到接收到软件的响应的精准耗时,并且不应该夹杂任何其他操作的进行时间。

对于每一个载荷所产生的结果来说,都应该至少包含上述3块内容。那么,载荷发生器的输出就应该是按照响应的到达顺序排列的一个结果列表。根据这些结果,我们就可以计算出软件每秒处理载荷的数量(以下简称每秒载荷处理量)。每秒载荷处理量一定小于或等于我们设定的每秒载荷量。软件在处理某些载荷的时候可能会出错、失败或超时。

7.3.2 基本结构

在做好足够的功课之后,我们就可以开始动手编写载荷发生器了。首先,根据前面的分析和设计,我们要确定载荷发生器的基本结构。我们应该用结构体类型声明来表示它的基本结构。在这个声明中,肯定应该包含我们在前面提到的那3个重要的参数:

  1. timeoutNs time.Duration // 响应超时时间,单位:纳秒。
  2. lps uint32 // 每秒载荷发送量。
  3. durationNs time.Duration // 负载持续时间,单位:纳秒。

其中两个表示时间的字段的类型均是time.Duration。这是为了方便设定超时和定时任务。而lps即是Loads per second的缩写。这沿用了QPS和TPS的命名规则。

我们在前面说过,负载发生器的输出应该是一个结果列表。但是,我们在这里不应该使用数组或切片作为收纳结果的容器。原因是,负载发生器是并发的发送载荷的。这意味着软件处理载荷的结果也会并发地被添加到列表中。我们已经知道,数组和切片本身都不是并发安全的。Go语言原生的数据类型中只有通道是并发安全的,并且可以同时由任意个Goroutine使用。它是收纳结果的绝佳容器。因此,我们将这样一个通道类型的字段也加入到载荷发生器的类型声明中:

  1. resultCh chan *lib.CallResult // 调用结果通道。

其中,lib.CallResult是一个专为处理结果声明的数据类型。“Call”的意思是调用,它象征着我们对软件的API的调用。这个调用无所谓是本地的还是远程的。因此,我们也可以称针对单个载荷的处理结果为调用结果。调用结果的类型声明被放在了代码包loadgen/lib中。

这里需要特别说明一下,所有与载荷发生器有关的代码都被会放到loadgen代码包及其子包中。因此,为了使这些标识符看起来简短一些,我们在后面的内容中会省略掉在这些代码包中声明的程序实体的部分限定符。例如,类型loadgen/lib.CallResult会被简写成lib.CallResult。实际上,这也符合Go语言的限定标识符的书写规则。

回到正题,lib.CallResult类型的基本结构如下:

  1. // 调用结果的结构。
  2. type CallResult struct {
  3. Id int64 // ID。
  4. Req RawReq // 原生请求。
  5. Resp RawResp // 原生响应。
  6. Code ResultCode // 响应代码。
  7. Msg string // 结果成因的简述。
  8. Elapse time.Duration // 耗时。
  9. }

lib.CallResult结构体的类型声明中,包含了我们前面所说的那3块内容。字段ReqResp分别代表了请求内容和响应内容,字段CodeMsg被用来描述响应的状态,而字段Elapse则被用于表明请求处理耗时。最后,字段Id被用来标识和区分调用结果。

我们已经看到,在上述类型声明中又包含了两个自定义的类型,即lib.RawReqlib.RawResp。它们的声明如下:

  1. // 原生请求的结构。
  2. type RawReq struct {
  3. Id int64
  4. Req []byte
  5. }
  6. // 原生响应的结构。
  7. type RawResp struct {
  8. Id int64
  9. Resp []byte
  10. Err error
  11. Elapse time.Duration
  12. }

这两个类型的声明中也都包含了Id字段。对于同一个载荷而言,其相关的请求、响应和调用结果中的Id字段的值都应该是一致的。这对于我们了解处理载荷的全过程会很有帮助。

类型lib.RawReqReq字段的作用是容纳原生请求的数据。我们知道,数据的最底层的表现形式就是一个或多个字节。因此,在这里,我们使用[]byte来作为Req字段的类型。相比之下,lib.RawResp类型中除了Id和代表原生响应数据的Resp字段之外,还包含了ErrElapse字段。Err字段的值会体现在发送请求和接收响应的过程中(或者说在调用被测软件的API的过程中)发生的错误。如果没有发生错误,那么这个字段的值将会是nil。而Elapse字段则被用来表示这个过程的耗时,单位是纳秒。注意,lib.CallResult类型中的Elapse字段的含义与这是一致的。

现在,我们回到载荷发生器的基本结构上来。前面说到,它的resultCh字段的类型是chan *lib.CallResult。由于结构体类型的零值不是nil,所以如果这个通道的元素类型是lib.CallResult的话,就会给我们对其中的元素值的零值判断带来一些困扰。我们使用它的指针类型作为通道的元素类型,既可以消除这种困扰,也可以省去因元素值复制而带来的一些开销。

载荷发生器的类型目前已经拥有代表了必需的参数和被用来收纳调用结果的容器的4个字段。不过,这样显然是不够的。因为我们还需要一些内部字段来对整个过程进行控制,并且还要使载荷发生器具有良好的可扩展性。

我们为载荷发生器指定的响应超时时间和每秒载荷发送量,应该能够作为它生成和发送载荷的频率的依据。也就是说,我们应该可以根据这两个参数的值计算出具体的并发量,并以此指导实际的载荷发送操作。那么这个计算得出的并发量应该被放在哪儿?最好的存放位置当然是载荷发生器的结构内部。因此,我们在其中又声明了这样一个字段:

  1. concurrency uint32 // 并发量。

一旦有了并发量的具体数值,我们就有了控制载荷发生器使用系统资源的依据。另外,作为一个Go语言的并发程序,它使用Goroutine的个数应该是我们关心的。过少的Goroutine数量会使程序的并发程度不够,从而导致程序不能充分地利用系统资源。而过多的Goroutine数量则可能会使程序的性能不升反降。因为这对于Go语言运行时系统及其依托的操作系统来说都会造成过重的负担。那么怎样合理地控制程序所启用的Goroutine的数量呢?

我们在讲非缓冲通道时举过一个示例,其中提到了一个控制Goroutine数量方法。这涉及一个名为goTicket的变量和一个名为initGoTicket的函数。该函数初始化了一个Goroutine票池。这个Goroutine票池以一个缓冲通道作为载体,并由goTicket变量代表。在那个示例中,这个Goroutine票池处于一种尚未被封装的状态。而在本节的示例中,我们将把它以及相关的操作封装在一个结构体类型中。这个结构体类型是接口类型GoTickets的一个实现。它们的声明都被放在了loadgen/lib代码包中。我们将在下一小节对它们进行展示和讲解。

既然有了这样一个Goroutine票池,我们就会在载荷发生器的相关方法中使用到它。因此,我们也应该把它作为载荷发生器结构中的一个字段:

  1. tickets lib.GoTickets // Goroutine票池。

载荷发生器良好的可控性不应仅仅体现在这一个方面。在载荷发生器运行的过程中,我们应该可以随时停止它。为此,我们应该着手设计被用来传递停止信号的方式。我们说过,在不同的Goroutine之间传递数据最好的方式就是通过通道。而由于载荷发生器在一开始就是被作为并发程序创建的。因此,我们在其结构中添加这样一个字段:

  1. stopSign chan byte // 停止信号的传递通道。

由于被传递的数据只是一个“信号”而已,所以我们应该把该通道的元素类型设定为一个占用空间最小的类型。在Go语言中,bool类型和byte类型都是占用空间最小的基本类型。它们的单值都只会占用一个字节的空间。不过,byte类型比bool类型更加灵活一些。因为它可能的值不只两个。这会对我们今后可能的扩展有好处。据此,我们才选用byte类型作为stopSign通道的元素类型。

至此,我们向载荷发生器的结构中又加入了3个起到控制作用的字段。不过,这还不算完。既然载荷发生器可以有不止一种的状态,那我们理应再为它添加一个状态字段。状态字段的类型可以是数值类型。但是,为了让它的值与普通的数值有所区别,我们专门为此声明了一个类型:

  1. // 载荷发生器的状态的类型。
  2. type GenStatus int

可以看到,GenStatus类型只是int类型的一个别名类型而已。另外,为了让载荷发生器的状态更有字面意义,我们还声明了3个GenStatus类型的常量:

  1. const (
  2. STATUS_ORIGINAL GenStatus = 0
  3. STATUS_STARTED GenStatus = 1
  4. STATUS_STOPPED GenStatus = 2
  5. )

有了它们,我们设置和判断载荷发生器的状态的时候就方便多了。GenStatus类型和这几个常量的声明也都被放到了loadgen/lib代码包中。

有了lib.GenStatus类型,我们就可以为载荷发生器添加状态字段了:

  1. status lib.GenStatus // 状态。

最后,不要忘记,我们想让载荷发生器具有良好的可扩展性。我们应该让它的使用者可以根据具体需求对它进行适当的扩展和定制。为了达到这个目的,我们应该预先在其结构中添加一个字段,并以此作为载荷发生器的扩展接口。

不过,在添加这个字段之前,我们应该先搞清楚载荷发生器需要提供哪些扩展。首先,载荷发生器的核心功能肯定是控制和协调请求的生成和发送、响应的接收和验证,以及最终结果的递交等一系列的操作。既然由它来进行对这些操作的控制和协调,那么有些具体的操作是否就可以由可定制的组件来做呢?这样我们就可以把核心功能与可扩展的功能(或者说可作为组件的功能)区分开了。并且,如此一来,既可以保证核心功能的稳定,又可以提供较高的可扩展性。

我们已经确定了一定要作为核心功能的部分。现在我们来看看有哪些被控制和协调的操作可以作为组件功能。显然,我们不知道或者无法预测到被测软件提供API的形式。况且,载荷发生器不应该对此有所约束,它们可以是任意的。因此,与调用被测软件的API有关的功能应该被作为组件功能,这涉及请求的发送操作和响应的接收操作。据此,既然我们组件化了调用被测软件API的功能,那么请求的生成操作和响应的检查操作也都肯定无法由载荷发生器本身来提供。

根据上面的分析,我们编写出了这样一个接口类型来体现可被组件化的功能:

  1. // 调用器的接口。
  2. type Caller interface {
  3. // 构建请求。
  4. BuildReq() RawReq
  5. // 调用。
  6. Call(req []byte, timeoutNs time.Duration) ([]byte, error)
  7. // 检查响应。
  8. CheckResp(rawReq RawReq, rawResp RawResp) *CallResult
  9. }

虽然说被作为了非核心功能,但是该接口类型中的那几个方法所代表的操作也都是载荷发生器在运行过程中不可或缺的。在执行测试流程的过程中,它们肯定需要被用到。因此,我们应该在初始化载荷发生器的时候给定一个lib.Caller接口类型的实现值。并且,在载荷发生器的结构中也应该存在一个该类型的字段,以存放我们在初始化时给定的那个实现值。

到这里,我们根据一系列准备和设计而编写的载荷发生器的结构体就完成了。表示其结构的完整代码如下:

  1. // 载荷发生器的实现。
  2. type myGenerator struct {
  3. caller lib.Caller // 调用器。
  4. timeoutNs time.Duration // 响应超时时间,单位:纳秒。
  5. lps uint32 // 每秒载荷发送量。
  6. durationNs time.Duration // 负载持续时间,单位:纳秒。
  7. concurrency uint32 // 并发量。
  8. tickets lib.GoTickets // Goroutine票池。
  9. stopSign chan byte // 停止信号的传递通道。
  10. status lib.GenStatus // 状态。
  11. resultCh chan *lib.CallResult // 调用结果通道。
  12. }

这个代表了载荷发生器的结构体类型myGenerator被放在loadgen代码包中。 在它的结构中,共包含了9个字段。其中,限定标识符lib.Caller表明,该接口类型也被放在了loadgen/lib代码包中。读者可能会有个疑问:代表载荷发生器的结构体类型myGenerator为什么是包级私有的?难道我们不想让loadgen包之外的程序使用它吗?关于这个问题,我们稍后再做解释。

7.3.3 初始化

在完成基本结构的编写之后,我们就要开始考虑载荷发生器应该以怎样的方式被初始化了。是直接通过复合字面量,还是用其他更好的方式?

对于简单、直接的结构体来说,使用复合字面量来初始化肯定会是首选。但是对于稍微复杂一些的结构体来说,只是简单地为其中的字段赋予相应的值就不能算是充分地初始化了。因为,这样的结构体会对字段的值有所要求,并且有些字段的值是需要通过一些计算步骤才能给出的。我们在上一小节声明的载荷发生器的结构体类型myGenerator就属于这种情况。

在Go语言中,一般会使用一个函数来创建和初始化较复杂的结构体。这类函数的名称通常会以“New”作为前缀,然后后跟相关的名称,像这样:

  1. func NewMyGenerator() *myGenerator

依据面向接口编程的原则,我们不应该直接将myGenerator*myGenerator作为上述函数的结果类型。因为这样会使该函数与具体的结构体类型紧密地绑定在一起。如果我们要修改该结构体类型的名称或者完全更换一套载荷发生器的实现,那么调用该函数的所有代码都不得不经历被动的变化,这会造成散弹式的修改。我们应该尽力避免此类情况的发生。因此,让这类初始化函数返回一个接口类型的结果是很有必要的。这个接口类型应该可以充分地体现出载荷发生器的行为。声明这个接口类型相当容易。为了使载荷发生器具有易用性,我只需让它暴露寥寥几个方法。请看下面的这个接口类型声明:

  1. // 载荷发生器的接口。
  2. type Generator interface {
  3. // 启动载荷发生器。
  4. Start()
  5. // 停止载荷发生器。
  6. // 第一个结果值代表已发载荷总数,且仅在第二个结果值为true时有效。
  7. // 第二个结果值代表是否成功将载荷发生器转变为已停止状态。
  8. Stop() (uint64, bool)
  9. // 获取状态。
  10. Status() GenStatus
  11. }

方法Start当然是需要的。我们用它来启动载荷发生器。Stop方法是为了实现我们之前所说的载荷发生器的可控性而存在的。我们应该可以在任何时候停止载荷发生器的运行。该方法的两个结果值分别代表已发载荷总数和停止操作的成功与否。通常情况下,该操作总是会成功的。但是,在载荷发生器还未被启动的情况下,调用Stop方法肯定不会起到什么作用。这时,该方法的第二个结果值就应该是false。至于Status方法,应该真实地反映出载荷发生器的当前状态。该方法的结果类型之所以是GenStatus而不是lib.GenStatus,是因为接口类型Generator的声明与GenStatus类型的声明被放在了同一个代码包中。

好了,在有了这样一个接口之后,那个被用于创建和初始化载荷发生器的函数的声明应该改变为:

  1. func NewGenerator() lib.Generator

我们打算让myGenerator类型成为接口类型lib.Generator的一个实现类型。这需要我们为myGenerator类型编写出与lib.Generator接口中声明的方法一一对应的3个方法。读者会在后面陆续看到这一实现过程。

显然,NewGenerator函数的声明还不完善。为了实现其功能,我们还应该为它添加若干个参数声明。因此,NewGenerator函数的第二版声明应该是这样的:

  1. func NewGenerator(
  2. caller lib.Caller,
  3. timeoutNs time.Duration,
  4. lps uint32,
  5. durationNs time.Duration,
  6. resultCh chan *lib.CallResult) (lib.Generator, error)

该函数的5个参数的名称、类型和含义分别与myGenerator类型中的相应字段对应。在这个函数中,我们会根据它们的值对myGenerator类型进行充分的初始化。不过,在这之前,我们应该先对这几个参数的值的有效性进行检查,并在检查不通过时向函数调用方告知错误情况。这也是我们为该函数添加第二个结果声明的原因。如果此处的检查通过了,我们就需要依据这几个参数值创建并初始化一个myGenerator类型的值。当然,使用复合字面量是必须的,就像下面这样:

  1. gen := &myGenerator{
  2. caller: caller,
  3. timeoutNs: timeoutNs,
  4. lps: lps,
  5. durationNs: durationNs,
  6. stopSign: make(chan byte, 1),
  7. status: lib.STATUS_ORIGINAL,
  8. resultCh: resultCh,
  9. }

我们使用复合字面量创建了一个myGenerator类型的值,同时对其中的7个字段进行了赋值。然后,我们把该值的指针赋给了变量gen。还记得吗?在我们的设计中,myGenerator类型会是接口类型lib.Generator的一个实现类型。所以,我们需要把gen的值作为NewGenerator函数的第一个结果值。这样能够被编译通过的前提是,myGenerator类型的方法集合中要包含lib.Generator接口类型中声明的那3个方法。我们会在下一小节展示*myGenerator类型的这3个公共方法的具体实现。

经过上述步骤之后,我们已经对载荷发生器中的7个字段进行了检查和赋值。不过不要忘了,仍有两个字段未被初始化。它们是concurrency字段和tickets字段。concurrency字段的值应该代表相关调用过程的并发执行数量。一个调用过程总体上包含了两个操作。一个是向被测软件发送一个载荷(或者说对被测软件的API进行一次调用)的操作,另一个是等待并从被测软件那里接收一个响应(或者说等待并获取被测软件的API返回的此次调用的结果)的操作。换句话说,一个调用过程即代表了载荷发生器通过一个载荷与被测软件进行的一次交互。因此,这一过程的并发执行数量可以比较真实地反映出被测软件的负载程度。

调用过程的并发执行数量(以下简称并发量)需要根据timeoutNs字段和lps字段的值以及一个公式计算得出。这个公式如下:

并发量 ≈ 单个载荷的响应超时时间 / 载荷的发送间隔时间

在此公式中,单个载荷的响应超时时间即是timeoutNs字段的值所表示的时间。一旦一个载荷发送操作的耗时达到了响应超时时间,相应的载荷就会被判定为没有被成功响应的载荷。在这之后,载荷发生器不会再去等待该载荷的响应。反过来讲,在达到这个响应超时时间之前,接收该载荷响应的操作是不会被强制结束的。假设响应超时时间是5秒钟,并且所有载荷响应接收操作都会用满这个时间,那么在此时间范围内开始的所有的载荷响应接收操作都会被并发地执行。也就是说,如果我们在5秒钟之前开始计数并在达到响应超时时间的此刻停止计数,那么在这个5秒钟的时间范围之内开始的载荷响应接收操作的数量就应该约等于此刻的并发量。在响应超时时间为5秒钟的设定下,如果我们每隔1秒钟向被测软件发送一个载荷,那么这个并发量就是5。而如果我们每隔1毫秒发送一个载荷,那么该并发量就应该是5000。这里所说的发送间隔时间是可以由载荷发生器的lps字段的值计算得出的。concurrencytimeoutNslps这3个字段的值之间的关系如下:

  1. concurrency timeoutNs / (1e9 / lps) + 1

其中,表达式1e9 / lps所表示的就是根据使用方对lps的值的设定计算出的载荷发生器发送载荷的间隔时间,单位是纳秒。1e9代表了1秒钟对应的纳秒数。这样,表达式timeoutNs / (1e9 / lps)的含义就是在响应超时时间代表的某一个时间周期内的并发量的最大值。而最后与之相加的1则代表了在某一个时间周期之初向被测软件发送的那个载荷。

对于一个通用的性能测试软件来说,这已经算是比较准确的换算方式了。因为我们无法得知被测软件对于每一个载荷的响应都会经过多长时间才能够被返回。实际上,这个真实的载荷响应时间也应该是我们通过性能测试得到的数值之一。换句话说,性能测试软件要做的就是,先通过若干个预设的限定值模拟出一定程度的负载,然后再以此来测试并得到被测试软件实际能承受的最大负载。我们在前面讲到的响应超时时间、每秒载荷发送量和负载持续时间,以及经过计算得出的并发量都属于预设的限定值。

我们计算并发量的最大意义是:为约束被并发运行的Goroutine的数量提供支撑。也就是说,我们会依此数值确定载荷发生器的tickets字段所代表的那个Goroutine票池的容量。这个容量也可以被理解为Goroutine票的总数量。Goroutine票池的初始化工作是由lib.NewGoTickets函数来完成的。对该函数的调用如下:

  1. tickets, err := lib.NewGoTickets(gen.concurrency)

在讲述该函数的内部实现之前,我们先来看看tickets字段的类型lib.GoTickets。它也是lib.NewGoTicket函数的第一个结果的类型。此接口类型的声明如下:

  1. // Goroutine票池的接口。
  2. type GoTickets interface {
  3. // 获得一张票。
  4. Take()
  5. // 归还一张票。
  6. Return()
  7. // 票池是否已被激活。
  8. Active() bool
  9. // 票的总数。
  10. Total() uint32
  11. // 剩余的票数。
  12. Remainder() uint32
  13. }

这个接口类型包含了4个方法声明。其中,方法TakeReturn是对应的。前者的功能是从票池获得一张票,而后者在被调用之后会向票池归还一张票。读者可能会感觉这里的“获得”和“归还”操作都比较抽象,不太好理解。确实是这样。实际上,Goroutine票池既不会关心使用方从它那里获得的是哪一张票,也不需要知道使用方把哪一张票归还给了它。这里的“票”本身就是一个抽象的概念。就像我们在上一节讲单向的非缓冲通道的时候所说的那样,这里的票其实就相当于程序为了启用一个Goroutine而必须持有的令牌。Goroutine票池只负责增减票的数量并以此真实地体现出正在被运行的专用Goroutine的数量。

我们也可以把Goroutine票池看成一个POSIX标准中描述的多值信号灯。一个POSIX多值信号灯的值代表了可用资源的数量。资源使用方获得或归还资源时会及时减少或增大该信号灯的值,以便其他使用方实时了解相应资源的使用情况。在该值被减至0之后,所有试图减少该值的程序(或者说进程)都会为此而被阻塞。而当该值重新被增至一个正整数的时候,这些程序就都会被唤醒。不过,只会有与信号灯的数值相同的个数的程序能够成功的对其进行减1操作,而其他程序则会被迫继续等待。Goroutine票池所表现的行为与多值信号灯非常地类似。lib.GoTickets接口的Take方法和Return方法分别对应了多值信号灯上的增1操作和减1操作。

接口类型lib.GoTickets中的后3个方法声明应该很好理解。一旦Goroutine票池被正确地初始化,Active方法返回的结果值就应该是true。而Total方法和Remainder方法在被调用后则分别会返回代表了票的总数和剩余数的结果值。

根据lib.GoTickets接口的类型声明以及我们上面的描述,我们很容易地编写出了该接口的实现类型。与lib.Generator接口的实现类型一样,该类型也是一个指针类型,名为lib.myGoTickets。我们在3.2节中已经说明过结构体类型与指向它的指针类型之间的区别,也强调了与之相对应的值方法和指针方法之间的若干不同。注意,如果我们想要让lib.myGoTickets类型成为接口类型lib.GoTickets的一个实现类型的话,就必须为lib.myGoTickets类型编写出与lib.GoTickets接口中声明的方法一一对应的5个指针方法。

首先,让我们先来编写lib.myGoTickets类型的基本结构:

  1. // Goroutine票池的实现。
  2. type myGoTickets struct {
  3. total uint32 // 票的总数。
  4. ticketCh chan byte // 票的容器。
  5. active bool // 票池是否已被激活。
  6. }

其中的total字段的含义就是我们刚刚所说的Goroutine票的总数量。ticketCh字段则代表了承载Goroutine票的容器。之所以使用chan byte作为它的类型,是因为我们要以最简单且最节省系统资源的方式来实现Goroutine票的“获得”和“归还”操作,并使它具有自同步特性。而第三个字段active,正是被用来表示当前的Goroutine票池是否已被正确地初始化的。

根据lib.myGoTickets类型中的这3个字段,我们可以立即编写出它的那5个指针方法(请参照lib.GoTickets类型的声明)。这样,我们才能使*lib.GoTickets类型成为lib.GoTickets接口的一个实现类型。这应该并不困难,请读者自己动手试一试。在必要时,读者可以参看前面的说明文字和我们在上一节编写的fecthPerson函数以及相关的描述。那里提及了使用缓冲通道作为Goroutine票池的典型用法。

希望读者已经自己动手编写完成了lib.myGoTickets类型的所有必要的公共方法。不知读者是否考虑到了我们前面所说的正确地初始化的问题。在我编写的版本中,对lib.myGoTickets类型值的初始化工作都由它的包级私有的指针方法init来进行。这个方法的完整声明如下:

  1. func (gt *myGoTickets) init(total uint32) bool {
  2. if gt.active {
  3. return false
  4. }
  5. if total == 0 {
  6. return false
  7. }
  8. ch := make(chan byte, total)
  9. n := int(total)
  10. for i := 0; i < n; i++ {
  11. ch <- 1
  12. }
  13. gt.ticketCh = ch
  14. gt.total = total
  15. gt.active = true
  16. return true
  17. }

在该方法中,我们在最前面做了一些前置性的检查,并且在最后面对接收者值(即那个lib.myGoTickets类型的当前值)中的字段进行了赋值。然而,在此方法体中间的那几行代码却是最关键的。我们以参数total的值作为容量初始化了一个元素类型为byte的缓冲通道ch。我们本可以把这个缓冲通道直接赋给当前值的ticketCh字段,但是却没有这么做。还记得吗?我们在前面说过,lib.myGoTickets类型值会用它的ticketCh字段的值来作为承载Goroutine票的容器。这就意味着,该通道中缓冲的元素值的个数就代表了还没有被获得和已被归还的Goroutine票的总和。那么,在Goroutine票池被初始化的时候,其中所有的Goroutine票应该都没有被获得。因此,在此时,我们应该让该通道缓冲的元素值的个数与其容量相等。如果我们不这么做,那么之后所有试图从该Goroutine票池中获得Goroutine票的Goroutine都会被阻塞,从而会使所有的载荷发送操作都无法进行下去。这也意味着载荷发生器的主流程的执行的停滞。它永远也不会输出测试结果,并且它也无法被运行结束。

相信读者已经理解了“正确的初始化”的含义。在lib.myGoTickets类型的init方法被编写完成之后,lib.NewGoTickets函数就可以非常方便地创建并初始化一个lib.GoTickets类型的值了,像这样:

  1. func NewGoTickets(total uint32) (GoTickets, error) {
  2. gt := myGoTickets{}
  3. if !gt.init(total) {
  4. errMsg :=
  5. fmt.Sprintf("The goroutine ticket pool can NOT be initialized! (total=%d)\n", total)
  6. return nil, errors.New(errMsg)
  7. }
  8. return &gt, nil
  9. }

如上所示,我们先用复合字面量myGoTickets{}创建了一个lib.myGoTickets类型的值。不过,在此时,该值的所有字段的值都还只是相应类型的零值。我们还需要调用这个lib.myGoTickets类型值的init方法以完成初始化。该值的init方法会返回一个bool类型的结果值以告知初始化工作的成功与否。如果初始化不成功,那么我们就应该及时生成一个error类型值并将其返回给lib.NewGoTickets函数的调用方。否则,代表了已初始化的*lib.myGoTickets类型值的&gt就应该被返回。无论怎样,我们总是应该在该函数中返回两个结果值。

好了,现在我们编写完成了对myGenerator类型中的所有字段的初始化代码。现在,我们也可以只通过调用NewGenerator函数就创建出一个立即可用的载荷发生器了。NewGenerator函数的完成声明如下:

  1. func NewGenerator(
  2. caller lib.Caller,
  3. timeoutNs time.Duration,
  4. lps uint32,
  5. durationNs time.Duration,
  6. resultCh chan *lib.CallResult) (lib.Generator, error) {
  7. logger.Infoln("New a load generator...")
  8. logger.Infoln("Checking the parameters...")
  9. var errMsg string
  10. if caller == nil {
  11. errMsg = fmt.Sprintln("Invalid caller!")
  12. }
  13. if timeoutNs == 0 {
  14. errMsg = fmt.Sprintln("Invalid timeoutNs!")
  15. }
  16. if lps == 0 {
  17. errMsg = fmt.Sprintln("Invalid lps(load per second)!")
  18. }
  19. if durationNs == 0 {
  20. errMsg = fmt.Sprintln("Invalid durationNs!")
  21. }
  22. if resultCh == nil {
  23. errMsg = fmt.Sprintln("Invalid result channel!")
  24. }
  25. if errMsg != "" {
  26. return nil, errors.New(errMsg)
  27. }
  28. gen := &myGenerator{
  29. caller: caller,
  30. timeoutNs: timeoutNs,
  31. lps: lps,
  32. durationNs: durationNs,
  33. stopSign: make(chan byte, 1),
  34. cancelSign: 0,
  35. status: lib.STATUS_ORIGINAL,
  36. resultCh: resultCh,
  37. }
  38. logger.Infof("Passed. (timeoutNs=%v, lps=%d, durationNs=%v)",
  39. timeoutNs, lps, durationNs)
  40. err := gen.init()
  41. if err != nil {
  42. return nil, err
  43. }
  44. return gen, nil
  45. }

注意,变量gen的类型是myGenerator而不是myGenerator。如果在初始化过程中没有发生任何错误,那么NewGenerator函数就会把gen变量的值作为结果值返回给调用方。不过,到现在为止,我们还未说明怎样编写myGenerator类型的指针方法StartStopStatus。它们是使myGenerator类型成为Generator接口的实现类型的关键。当然,我们可以先在这里编写出相应的空方法(即方法体中无任何可作为具体实现的语句的方法)来使编译通过。

眼尖的读者可能会立即发现,我们用字面量初始化这个myGenerator类型的值的时候,为一个未曾讲过的字段cancelSign赋了值。这里先不对此进行解释,到后面再进行说明。

另外,在这段代码中出现的标识符logger代表的是loadgen包中声明的一个变量。相关代码如下:

  1. var logger logging.Logger
  2. func init() {
  3. logger = logging.NewSimpleLogger()
  4. }

变量logger会在代码包初始化函数init中被初始化。这用到了goc2p项目的logging代码包中的函数NewSimpleLogger。顾名思义,logging.Logger类型的值被用来以各种方式记录日志。这些日志可能会被打印或传送到任何地方。但是,可以肯定的是,由NewSimpleLogger函数返回的logging.Logger类型值只会将日志打印到标准输出上。我们还会在后面见到使用logger变量的代码。与此相关的具体实现,请读者阅读logging包中的代码。

在本节后面的部分中,我们会具体讨论载荷发生器的总体流程以及myGenerator类型的那3个公共的指针方法的编写方法。

7.3.4 启动和停止

我们会在本小节讨论启动和停止载荷发生器的相关流程,并完成对myGenerator类型的编写。

前文说过,我们调用载荷发生器的Start方法就可以启动它。在这之后,载荷发生器会按照我们给定的参数向被测软件发送一定量的载荷(或者说调用被测软件的API并传送一定量的请求)。在达到了我们指定的负载持续时间之后,载荷发生器会自动停止载荷发送操作。在从启动到停止的这个时间段内,载荷发生器还会将被测软件对各个载荷的响应(如果有的话)以及载荷发送的最终结果收集起来并发送给我们提供的调用结果通道。

这个流程看起来并不复杂。但实际上,其中包含了很多的细节。比较重要的就是有效控制载荷发送的并发量以及载荷发生器本身使用Goroutine的数量。还好,我们在对载荷发生器进行初始化的时候已经为此做好了准备。下面我们就来看看启动流程的具体实现方法。

1. 启动的准备

我们之前说过,载荷发生器的lps字段的值指明了它每秒向被测软件发送载荷的数量。根据这个值,我们可以很轻易地得到发送间隔时间。相应的表达式为1e9 / lps。为了让发送间隔时间能够起到实质性的作用,我们需要使用缓冲通道和断续器。还记得吗?我们在上一节介绍过断续器。它非常适合被作为定时任务的触发器。请看下面的代码:

  1. // 设定节流阀
  2. var throttle <-chan time.Time
  3. if gen.lps > 0 {
  4. interval := time.Duration(1e9 / gen.lps)
  5. logger.Infof("Setting throttle (%v)...", interval)
  6. throttle = time.Tick(interval)
  7. }

我们给被用来触发定时任务的缓冲通道起了一个看起来很酷的名字——节流阀。为了配合断续器的使用,我们将它的类型设定为<-chan time.Time。这是一个单向通道类型。之所以在这里进行通道方向上的限制不会有什么问题,是因为我们仅仅会通过调用time.Tick函数为变量throttle赋值。我们知道,time.Tick函数的结果值就是这个类型的。该结果值是一个可以周期性地传达到期事件的缓冲通道。作为约束,time.Tick函数只允许它的调用方从该通道中接收元素值。

如果lps字段的值大于0,我们就会算出发送间隔时间并依据这个时间设置断续器。变量throttle所代表的节流阀会直接、及时地体现出断续器的行为。

我们暂时把创建好的节流阀放在一边。不用担心,我们已经说明过,断续器的功能不会因代表到期事件的通道元素值未被及时接收而受到影响。

在真正使用节流阀之前,我们还有另外一个准备工作要做,即让载荷发生器能够在运行一段时间之后自己停下来。这里的一段时间就是我们先前给定的负载持续时间。还记得吗?载荷发生器有个被用来传递停止信号的缓冲通道stopSign。我们已经在前面对它进行了初始化。它的长度被设定为了1。我们下面就利用time包中的time.AfterFunc函数来实现定时的向stopSign发送停止信号的功能。

首先,为了让这个定时发送操作不阻碍启动流程的执行,我们需要把它放到一个专用的Goroutine中进行。与time.After函数相比,使用time.AfterFunc函数的优势是可以直接在调用表达式中自定义处理到期事件的操作。因此,实现初始化停止信号的功能的代码应该如下所示:

  1. // 初始化停止信号
  2. go func() {
  3. time.AfterFunc(gen.durationNs, func() {
  4. logger.Infof("Stopping load generator...")
  5. gen.stopSign <- 0
  6. })
  7. }()

如上所示,当经过由载荷发生器的durationNs字段代表的一段时间之后,该Goroutine中的代码会向停止信号传递通道发送一个元素值以表示停止载荷发生器的流程应该结束了。

为什么使用一个通道来传递停止信号而不是直接结束当前进程呢?主要原因是,我们想要让正在执行中的各个子流程自行结束。这样的结束方法显然会比强制性的结束更加合理和安全。那么stopSign字段是怎样起到让各个子流程自行结束的作用的呢?别着急,我们稍后就会讲到。

到这里,为了启动载荷发生器而进行的所有准备工作都已完成。我们可以改变当前的载荷发生器的状态了:

  1. // 设置已启动状态
  2. gen.status = lib.STATUS_STARTED

2. 控制流程

在进入到已启动的状态之后,载荷发生器才真正开始生成并发送载荷。包含了载荷发送操作和载荷响应接收操作的调用过程应该是被异步执行的。因为只有这样,载荷发生器才能够从总体上管理和控制它们。请看下面的这个函数:

  1. func (gen *myGenerator) genLoad(throttle <-chan time.Time, endSign chan<- uint64) {
  2. callCount := uint64(0)
  3. Loop:
  4. for ; ; callCount++ {
  5. select {
  6. case <-gen.stopSign:
  7. gen.handleStopSign()
  8. endSign <- callCount
  9. break Loop
  10. default:
  11. }
  12. gen.asyncCall()
  13. if gen.lps > 0 {
  14. select {
  15. case <-throttle:
  16. case <-gen.stopSign:
  17. gen.handleStopSign()
  18. endSign <- callCount
  19. break Loop
  20. }
  21. }
  22. }
  23. }

我们使用这个名为genLoad的载荷发生器的指针方法来从总体上控制各个调用流程的执行。该方法接受两个参数,第一个参数就是我们在前面已经准备好的节流阀throttle,而第二个参数endSign则承担着传递停止信号的响应的任务。这个响应不只是被用来表达所有调用过程都已接收到载荷发生器欲停止的通知的。它还被用于向genLoad方法的调用方传递载荷发送的总数。这也是我们把它的元素类型设定为uint64的原因。在本小节的最后,读者会了解到,为了在载荷发生器的Stop方法中正确设置它的第一个结果值,我们需要提升endSign通道的作用域。

genLoad方法的方法体中,我们使用一个for循环来周期性的向被测软件发送载荷。这个周期的长短是由节流阀控制的。在循环体的最后,如果lps字段的值大于0,那么就表示节流阀是有效并需要使用的。这时,我们利用select语句来等待节流阀中的到期事件。一旦接收到了这样一个事件,我们就立即开始下一次迭代(即开始生成并发送下一个载荷)。当然,如果在等待节流阀的到期事件的过程中接收到了停止信号,那么我们就应该立即对它进行处理并终止当前的循环。正因为如此,genLoad方法中的第二条select语句有两个case

针对stopSign通道的接收操作也出现在了for循环的开始处。这是因为我们要对及时地处理停止信号做进一步的保证。在这条select语句中,我们加入了default case。原因是我们只想在这里检测一下stopSign通道中是否存在停止信号,而不想因此而阻塞迭代的执行。

一旦发现有停止信号,我们就需要立即进行相应的处理。停止信号的处理代码被放在了载荷发生器的handleStopSign方法中。它其实非常简单:

  1. func (gen *myGenerator) handleStopSign() {
  2. gen.cancelSign = 1
  3. logger.Infof("Closing result channel...")
  4. close(gen.resultCh)
  5. }

载荷发生器的cancelSign字段也是作为实现可控性的一个部分存在的。它的作用是,在接收到停止信号之后告知正在执行中的各个调用过程,以使它们取消掉还未被进行的操作。这也是cancelSign这个名称的由来。注意,cancelSign字段的类型不是一个通道类型,而是值占用空间最小的byte类型。这是因为我们只会在处理停止信号的时候把它的值从0改变为某一个正整数。由于该字段的特殊用途,所以它的值也永远不会被修改回0。另外,各个调用过程对该字段的的值的判断也是极其简单的,只会判断其是否大于0。因此,即使我们的程序会并发地修改它的值也是无关紧要的。在这些前提之下,我们让cancelSign字段尽可能地简单化了。我们在后面会了解到调用过程对cancelSign的值的判断以及相应的处理。

handleStopSign方法中,我们先把1赋给了载荷发生器的cancelSign字段。然后,我们关闭了代表了调用结果通道的resultCh字段。这就意味着我们放弃了当前还未被执行完成的调用过程的结果。

在调用handleStopSign方法之后,我们向endSign通道发送了载荷发送的计数。最后,代表了控制流程的for语句的执行宣告结束。

3. 异步地调用

为了让控制流程与各个调用过程分离开来,我们又编写了载荷发生器的asyncCall方法。该方法的作用就是异步地执行每一个调用过程。更详细地说,一个调用过程分为5个操作步骤,即生成载荷、发送载荷并接收响应、检查载荷响应、生成调用结果,发送调用结果。其中的前3个操作步骤都会由使用方在初始化载荷发生器时传入的那个调用器中的方法来完成。特别提示,我们至此仍未用到的载荷发生器的timeoutNs字段应该在这个调用过程中发挥作用了。

既然说是异步地调用,那么读者应该能想到我们会在asyncCall方法中新启用一个Goroutine来完成调用过程。更确切地说,asyncCall方法每次被调用之后都会启用一个专用的Goroutine。这里所说的专用的Goroutine就是我们之前讲过的与Goroutine票池中的Goroutine票对应的Goroutine。因此,在asyncCall方法中,我们就应该在适当的时候对Goroutine票池中的票进行“获得”和“归还”操作,如下所示:

  1. func (gen *myGenerator) asyncCall() {
  2. gen.tickets.Take()
  3. go func() {
  4. // 省略若干条语句
  5. gen.tickets.Return()
  6. }()
  7. }

我们在启用专用Goroutine之前,从Goroutine票池获得一张Goroutine票。当Goroutine票池中已无票可拿时,asyncCall方法所属的Goroutine会被阻塞于此。只有存在多余的Goroutine票的时候,专用Goroutine才会被启用,从而当前的调用过程才会被执行。另一方面,在这个go函数的最后,我们会及时地把票归还给Goroutine票池。这个归还的时机非常重要,既不能提前也不能延后。

好了,现在异步调用的外围框架已经有了。下面我们来看看专用Goroutine需要执行的语句。第一个操作步骤当然是生成载荷。因为有了调用器,所以这里的代码相当简单:

  1. rawReq := gen.caller.BuildReq()

可以看到,我们仅仅是对调用器的BuildReq方法进行了调用并把其返回的原生请求暂存起来而已。

关于发送载荷并接收响应的这个步骤,我们很有必要详细说明一下。首先是对载荷发生器的timeoutNs字段的使用。我们已经知道,该字段应该起到辅助载荷发生器实时判断被测软件处理单一载荷是否超时的作用。我们之前讲过,time包中的定时器可以被用来设定某一个操作或任务的超时时间。要做到实时的判断超时,最好的方式就是与通道和select语句联用。不过,这就需要再启用一个Goroutine来执行发送载荷并接收响应的操作步骤了。那么,我们可以在不额外启用Goroutine的情况下实现实时的超时判断吗?答案是,可以。但是这需要一些技巧。

在讲述这些技巧之前,让我们先来看看联用定时器、通道和select语句以及再启用一个Goroutine的做法是怎样的。代表了执行发送载荷并接收响应的操作步骤(以下简称交互操作)的代码都被封装到了载荷发生器的interact方法中。该方法的声明如下:

  1. func (gen *myGenerator) interact(rawReq *lib.RawReq, rawRespCh chan<- *lib.RawResp)

它接受两个参数。第一个参数就是需要被发送给被测软件的原始请求,而第二个参数则是被用来传递原始响应的通道(以下简称原始响应通道)。在从被测软件处接收到响应之后,interact方法会把它封装成*lib.RawResp类型值并发送给原始响应通道。

我们先要创建并初始化interact方法所需的第二个参数值,然后再使用go语句异步地执行该方法。具体代码如下:

  1. var result *lib.CallResult
  2. rawRespCh := make(chan *lib.RawResp, 1)
  3. go gen.interact(&rawReq, rawRespCh)

前两行代码分别创建了代表了调用结果值的result变量和原始响应通道,而第三行代码的功能则是额外启用一个Goroutine来执行载荷发生器的interact方法。

在这之后的select语句就很好编写了。我们为它添加两个case,一个case试图从原始响应通道中接收元素值,而另一个case则根据timeoutNs字段的值创建一个定时器并等待相应的到期事件的来临。一旦从原始响应通道接收到了元素值,就意味着交互操作的完成。但是,如果在这之前定时器的到期事件抢先到达并被接收了,那么就说明交互操作的执行超时了。此时,载荷发生器就不应该再等待交互操作的执行结果了,而应该直接向调用结果通道发送一个描述了交互操作的超时情况的调用结果值。与此对应的代码如下:

  1. select {
  2. case rawResp := <-rawRespCh:
  3. if rawResp.Err != nil {
  4. result = &lib.CallResult{
  5. Id: rawResp.Id,
  6. Req: rawReq,
  7. Code: lib.RESULT_CODE_ERROR_CALL,
  8. Msg: rawResp.Err.Error(),
  9. Elapse: rawResp.Elapse}
  10. } else {
  11. result = gen.caller.CheckResp(rawReq, *rawResp)
  12. result.Elapse = rawResp.Elapse
  13. }
  14. case <-time.After(gen.timeoutNs):
  15. result = &lib.CallResult{
  16. Id: rawReq.Id,
  17. Req: rawReq,
  18. Code: lib.RESULT_CODE_WARNING_CALL_TIMEOUT,
  19. Msg: fmt.Sprintf("Timeout! (expected: < %v)", gen.timeoutNs)}
  20. }
  21. gen.sendResult(result)

在这段代码中,调用表达式gen.caller.CheckResp(rawReq, *rawResp)的含义是通过调用器的CheckResp方法检查原始响应并生成最终的响应结果。当然,只有在及时接收到未携带任何错误的原始响应的情况下,我们才需要对原始响应进行这样的检查。除此之外,我们仅通过复合字面量来部分初始化表示出现了某类错误的响应结果就可以了。

载荷发生器的sendResult方法的功能是向调用结果通道发送一个调用结果值。另外,为了更好地定义调用结果值中代表了响应代码的code字段的不同值所代表的含义,我们在loadgen/lib包中声明了如下的常量:

  1. // 保留 1 ~ 1000 给载荷承受者使用。
  2. const (
  3. RESULT_CODE_SUCCESS = 0 // 成功。
  4. RESULT_CODE_WARNING_CALL_TIMEOUT ResultCode = 1001 // 调用超时警告。
  5. RESULT_CODE_ERROR_CALL ResultCode = 2001 // 调用错误。
  6. RESULT_CODE_ERROR_RESPONSE ResultCode = 2002 // 响应内容错误。
  7. RESULT_CODE_ERROR_CALEE ResultCode = 2003 // 被调用方(被测软件)的内部错误。
  8. RESULT_CODE_FATAL_CALL ResultCode = 3001 // 调用过程中发生了致命错误!
  9. )

因此,我们可以在前一段代码中看到多个诸如lib.RESULT_CODE_WARNING_CALL_TIMEOUT的限定标识符。lib.RESULT_CODE_WARNING_CALL_TIMEOUT即表示在交互操作的执行已超时的情况下的响应代码。由于这个超时的时间是由载荷发生器的使用者指定的,所以此类情况并不能算是严格意义上的错误。因此,我们在这里把该响应代码归为警告级别。另一个值得说明的响应代码常量是RESULT_CODE_FATAL_CALL。该常量表示调用过程中的致命错误,即指调用器未预料到的错误。这很可能是调用器自身的错误,不过也不排除存在因调用器对调用过程中可能发生的错误预估不足而没有及时“抓住”错误的这种情况。在载荷发生器中,这类错误会由一个从调用器的某个方法中扩散出来的运行时恐慌表示。我们需要在载荷发生器的interact方法中加入处理这类错误的代码,不能让运行时恐慌外泄并影响到载荷发生器的控制流程的执行。因此,在调用interact方法之前插入一条defer语句是必须的:

  1. defer func() {
  2. if p := recover(); p != nil {
  3. err, ok := interface{}(p).(error)
  4. var buff bytes.Buffer
  5. buff.WriteString("Async Call Panic! (")
  6. if ok {
  7. buff.WriteString("error: ")
  8. buff.WriteString(err.Error())
  9. } else {
  10. buff.WriteString("clue: ")
  11. buff.WriteString(fmt.Sprintf("%v", p))
  12. }
  13. buff.WriteString(")")
  14. errMsg := buff.String()
  15. logger.Fatalln(errMsg)
  16. result := &lib.CallResult{
  17. Id: -1,
  18. Code: lib.RESULT_CODE_FATAL_CALL,
  19. Msg: errMsg}
  20. gen.sendResult(result)
  21. }
  22. }()

我们在第4章讲异常处理的时候详细探讨过运行时恐慌的处理方法。由此可知,recover函数的结果值的动态类型是未知的(其静态类型是interface{})。又由于在用的调用器很可能是载荷发生器的使用方自行实现的,所以在这里就更增加了不确定性。因此,我们先使用类型断言表达式来判断变量p的动态类型是否为error,然后再根据判断结果组织不同内容的结果成因简述(即会赋给调用结果值的Msg字段的值)。在这条defer语句的最后,我们把这个代表了调用致命错误的调用结果值发送给调用结果通道。

上面这条defer语句会作为与专用Goroutine对应的匿名go函数中的第一条语句出现。在它之后的就是前面展示的那两条分别声明变量resultrawRespCh的语句、被用来异步执行interact方法的go语句,以及那条最为关键的select语句。至此,专用Goroutine所对应的匿名go函数的编写即将完成。不过不要忘了,我们要在这个go函数即将被执行结束的时候,把持有的Goroutine票归还给Goroutine票池:

  1. gen.tickets.Return()

这条语句实际上已经在前面的示例中出现过了。这里只是再强调一下。

好了,我们已经编写完成了在asyncCall方法中使用一个专用的Goroutine执行一个调用过程的全部代码。这个asyncCall方法的实现看起来还不错。可是,如果我们再次纵观这段代码的话就会发现,我们实际上是启用了两个Goroutine来执行一个调用过程。一个Goroutine就是我们所说的专用Goroutine。而在专用Goroutine中,我们又启用了一个Goroutine来异步地执行interact方法。显然,这与我们的初衷有些出入。我们本想用Goroutine票池来限制专用Goroutine的总数量,但是实际情况却是,被用来执行调用过程的Goroutine的最大数量可能是Goroutine票池的总容量的两倍。启用额外的Goroutine的主要原因是我们需要实现针对交互操作的实时超时判断。如果我们能够用另一种方式实现这个实时超时判断,那么就能够避免这个额外Goroutine的启用了。为了达到这个目的,就需要对asyncCall方法的实现进行必要的改造。或许,还需要修改一下interact方法的签名。

我们先来试着改造一下asyncCall的方法的select语句中的第一个case表达式,像这样:

  1. case rawResp := <-func() <-chan *lib.RawResp {
  2. rawRespCh := make(chan *lib.RawResp, 1)
  3. gen.interact(&rawReq, rawRespCh)
  4. return rawRespCh
  5. }():

可以看到,我们使用匿名函数func() <-chan *lib.RawResp来作为该case的通道表达式。在这个匿名函数中,我们顺序地执行了interact方法,并将相应的原始响应通道作为了该匿名函数的结果值。乍一看,这样做似乎也是可以的。但是,还记得吗?我们在上一节讲到,运行时系统会先对select语句的各个case中的通道表达式和元素表达式进行求值,而且在所有的求值都完成以后才会去考虑选择某一个case执行。对于上面的这个被改造后的case来说,匿名函数中的interact方法会被早早地执行完毕。更重要的是,这条select语句的第二个case中的那个定时器直到第一个case中的通道(即那个匿名函数的结果值)已经缓冲了一个原始响应之后才会被启动。这相当于在执行完interact方法之后再去启动定时器。显然,这个定时器根本就起不到它应有的作用。所以,这个解决方案是不符合要求的。

现在让我们从头再来。实际上,我们想要的只是实时超时判断的异步化,而串行的执行交互操作应该是没有问题的。异步地进行实时超时判断其实并不困难。还记得time.AfterFunc函数吗?它可以让我们自定义在定时器的到期事件来临之时执行的处理函数。我们在前面说过,当定时器的到期事件抢先到来的时候,载荷发生器的asyncCall方法就应该直接向调用结果通道发送一个代表了交互操作超时的调用结果值,而不用再去理会交互操作。因此,我们应该这样来自定义超时处理函数以及调用time.AfterFunc函数:

  1. var timeout bool
  2. timer := time.AfterFunc(gen.timeoutNs, func() {
  3. timeout = true
  4. result := &lib.CallResult{
  5. Id: rawReq.Id,
  6. Req: rawReq,
  7. Code: lib.RESULT_CODE_WARNING_CALL_TIMEOUT,
  8. Msg: fmt.Sprintf("Timeout! (expected: < %v)", gen.timeoutNs)}
  9. gen.sendResult(result)
  10. })

可以看到,我们先声明了一个名为timeoutbool类型的变量(以下简称超时标识)。并且,在作为参数传递给time.AfterFunc函数的那个匿名的超时处理函数中,我们一开始就对它进行了设置。一旦超时标识timeout的值被设置为true,就说明交互操作已经执行超时。我们会在交互操作被执行完成之后去判断它。请看下面的代码:

  1. rawRespCh := make(chan *lib.RawResp, 1)
  2. gen.interact(&rawReq, rawRespCh)
  3. rawResp := <-rawRespCh
  4. if !timeout {
  5. timer.Stop()
  6. var result *lib.CallResult
  7. if rawResp.Err != nil {
  8. // 省略若干条语句
  9. } else {
  10. // 省略若干条语句
  11. }
  12. gen.sendResult(result)
  13. }

这段代码表示了交互操作的串行执行,以及后续的调用结果值的生成过程。后者的实现代码与前面展示过的代码并无不同,因此我们在这里省略掉了一些细节。我们需要关注的其实只有两个地方,即if语句的第一行和第二行。在从通道中接收到一个原始响应之后,我们会首先判断超时标识timeout的值。仅当其值为false的情况下,我们才会进行后续的操作。否则,我们就会忽略掉这个原始响应。这正符合我们声明timeout变量的初衷。此外,在需要进行后续操作的时候,我们在第一时间就停止了定时器。其意义是及时阻止自定义超时处理函数的执行。如若不然,一个已经无效的、代表交互操作执行超时的调用结果值就会在超时时间到来时被发送到调用结果通道中。

通过对timeout变量的赋值和判断以及对timer.Stop方法的调用,我们实现了异步化的实时超时判断子流程与作为主线的调用流程之间的同步。同时,我们还避免了额外的Goroutine的启用。这就意味着,对Goroutine票池的使用可以完全起到约束专用Goroutine数量的作用了。

至此,我们对asyncCall方法的改造已经基本完成。不过,不知读者是否发现,在经过这样的改造之后,原始响应通道rawRespCh已经显得有些多余了。因此,我们对interact方法的声明进行了修改:

  1. func (gen *myGenerator) interact(rawReq *lib.RawReq) *lib.RawResp

然后在asyncCall方法中完全去掉了rawRespCh通道,即将这几行代码

  1. rawRespCh := make(chan *lib.RawResp, 1)
  2. gen.interact(&rawReq, rawRespCh)
  3. rawResp := <-rawRespCh

变更为:

  1. rawResp := gen.interact(&rawReq)

为了便于读者阅读,我们在这里给出载荷发生器的asyncCall方法的完整实现:

  1. func (gen *myGenerator) asyncCall() {
  2. gen.tickets.Take()
  3. go func() {
  4. defer func() {
  5. if p := recover(); p != nil {
  6. err, ok := interface{}(p).(error)
  7. var buff bytes.Buffer
  8. buff.WriteString("Async Call Panic! (")
  9. if ok {
  10. buff.WriteString("error: ")
  11. buff.WriteString(err.Error())
  12. } else {
  13. buff.WriteString("clue: ")
  14. buff.WriteString(fmt.Sprintf("%v", p))
  15. }
  16. buff.WriteString(")")
  17. errMsg := buff.String()
  18. logger.Fatalln(errMsg)
  19. result := &lib.CallResult{
  20. Id: -1,
  21. Code: lib.RESULT_CODE_FATAL_CALL,
  22. Msg: errMsg}
  23. gen.sendResult(result)
  24. }
  25. }()
  26. rawReq := gen.caller.BuildReq()
  27. var timeout bool
  28. timer := time.AfterFunc(gen.timeoutNs, func() {
  29. timeout = true
  30. result := &lib.CallResult{
  31. Id: rawReq.Id,
  32. Req: rawReq,
  33. Code: lib.RESULT_CODE_WARNING_CALL_TIMEOUT,
  34. Msg: fmt.Sprintf("Timeout! (expected: < %v)", gen.timeoutNs)}
  35. gen.sendResult(result)
  36. })
  37. rawResp := gen.interact(&rawReq)
  38. if !timeout {
  39. timer.Stop()
  40. var result *lib.CallResult
  41. if rawResp.Err != nil {
  42. result = &lib.CallResult{
  43. Id: rawResp.Id,
  44. Req: rawReq,
  45. Code: lib.RESULT_CODE_ERROR_CALL,
  46. Msg: rawResp.Err.Error(),
  47. Elapse: rawResp.Elapse}
  48. } else {
  49. result = gen.caller.CheckResp(rawReq, *rawResp)
  50. result.Elapse = rawResp.Elapse
  51. }
  52. gen.sendResult(result)
  53. }
  54. gen.tickets.Return()
  55. }()
  56. }

asyncCall方法中,我们充分地使用了载荷发生器的使用方提供的调用器。在上述代码中,调用器的BuildReq方法和CheckResp方法各被调用了一次。而对它的Call方法的调用则被隐含在了负责进行交互操作的载荷发生器的interact方法中。

实际上,我们在载荷发生器的interact方法中只做了一件事,那就是对调用器的Call方法进行调用,即与被测软件进行一次交互操作。当然,为了真实地记录一次交互操作,我们还需要添加一些其他代码以进行诸如检查原始请求的有效性、记录对调用器的Call方法的调用耗时,以及根据Call方法的结果值生成原始响应等操作。

4. 启动和停止

让我们再次回到启动载荷发生器的话题上来。在一切准备工作完成之后,载荷发生器的Start方法中的代码会调用genLoad方法以执行控制流程。同时,它还会把一个名为endSign的缓冲通道作为参数传给genLoad方法。genLoad方法一旦从stopSign字段代表的通道处接收到停止信号,就会立即做出响应。它首先会把cancelSign字段的值设置为1,然后关闭调用结果通道。最后,它会向endSign通道发送一个代表了调用执行计数的元素值。endSign通道有两个作用。第一个作用是告知停止信号已被处理完毕,载荷发生器已经可以进入到已停止状态了。而第二个作用就是传递调用执行计数。这个告知和传递的对象有可能是Start方法中的代码,也可能是Stop方法中的代码。我们稍后会介绍它们对此的不同处理方式。

载荷发生器的停止流程总共涉及了3个被用来代表或传递信号的变量:stopSignendSigncancelSign。它们的作用都是在载荷发生器自动或被手动地停止的时候协调和传递其中的某种状态。通道stopSign被用来传递由定时器或载荷发生器的Stop方法中的代码所发送的停止信号。而endSign通道的作用则正如我们刚刚所说的那样。至于byte类型的载荷发生器字段cancelSign,我们至今还未说明它的真正作用。

为了让读者能够清晰地了解到载荷发生器联用这3个“信号”的方式,我绘制了一幅流程图(见图7-4)。在这幅流程图中,我们以载荷发生器自动停止的情况为例。

{%}

图 7-4 载荷发生器的自动停止流程

在载荷发生器的自动停止流程中,控制流程对cancelSign字段的设置会影响到所有已经开始执行的异步调用过程。更具体地说,各个异步调用过程在把调用结果发送给调用结果通道之前会先检查相应的充分条件,并且仅在充分条件被满足的情况下才会真正发送调用结果。具体代码如下:

  1. func (gen *myGenerator) sendResult(result *lib.CallResult) bool {
  2. if gen.status == lib.STATUS_STARTED && gen.cancelSign == 0 {
  3. gen.resultCh <- result
  4. return true
  5. }
  6. logger.Warn("Ignore result: %s.\n",
  7. fmt.Sprintf(
  8. "Id=%d, Code=%d, Msg=%s, Elapse=%v",
  9. result.Id, result.Code, result.Msg, result.Elapse))
  10. return false
  11. }

回顾asyncCall方法的实现,sendResult方法正是在其中被调用的。可以看到,我们刚刚所说的充分条件被满足的情况,即是载荷发生器的状态为lib.STATUS_STARTEDcancelSign字段的值为0。也就是说,把cancelSign字段的值设置为1,就相当于阻止所有还未被执行完成的异步调用过程向调用结果通道发送调用结果。换句话说,对cancelSign字段的设置致使载荷发生器放弃了这些调用结果。

下面我们再来看载荷发生器的手动停止流程。我们可以通过调用载荷发生器的Stop方法来手动地停止它。具体的停止方式与自动停止的方式如出一辙,也是向作为载荷发生器字段之一的stopSign通道发送一个元素值。Stop方法的实现如下:

  1. func (gen *myGenerator) Stop() (uint64, bool) {
  2. if gen.stopSign == nil {
  3. return 0, false
  4. }
  5. if gen.status != lib.STATUS_STARTED {
  6. return 0, false
  7. }
  8. gen.status = lib.STATUS_STOPPED
  9. gen.stopSign <- 1
  10. callCount := <-gen.endSign
  11. return callCount, true
  12. }

可以看到,如果stopSign通道还未被初始化,就意味着载荷发生器还未被启动。这时理应忽略掉后续的操作。而在载荷发生器的状态不等于lib.STATUS_STARTED的情况下,我们也不应该向stopSign通道发送停止信号。因为,这样的载荷发生器可能还未被启动,也可能已经被停止。无论怎样,后续的操作都应该被忽略。如果前面这两项检查都通过了,那么Stop方法就会先把当前载荷发生器的状态设置为lib.STATUS_STOPPED。之所以先设置其状态,是为了避免并发地调用Stop方法可能造成的相关操作被重复执行的情况。在设置好状态之后,Stop方法会向stopSign通道发送停止信号,然后再试图从endSign通道处接收代表了调用执行计数的元素值。一旦从中接收到了该计数,那么就说明停止信号已被处理完毕。最后,Stop方法会把调用执行计数和true返回给其调用方。

注意,在Stop方法中被使用的endSign通道是一个载荷发生器的字段,而不是一个局部变量。这就是我们在前面提及的需要将endSign通道的作用域提升的情况。因为如果在启动载荷发生器的时候传递给genLoad方法的参数endSign是一个局部变量,那么我们就不能在Stop方法中使用它了,且Stop方法的调用方也就无法得到确切的调用执行计数了。因此,在myGenerator结构体类型的声明中,我们还应该再加入一个字段的声明:

  1. endSign chan uint64 // 完结信号的传递通道,同时被用于传递调用执行计数。

至此,该类型的字段的数量已经增至11个,即除了在上一小节中列罗的那9个字段外,还新增了本小节提及的cancelSignendSign

在有了endSign字段之后,我们应该在Start方法中设置载荷发生器的状态为lib.STATUS_STARTED之前对它的endSign字段进行初始化:

  1. // 初始化完结信号通道
  2. gen.endSign = make(chan uint64, 1)

而在设置已启动状态之后,我们还应该把该字段的值传递给genLoad方法:

  1. // 生成载荷
  2. logger.Infoln("Generating loads...")
  3. gen.genLoad(throttle, gen.endSign)

或者,我们更应该直接修改一下genLoad方法的声明:

  1. func (gen *myGenerator) genLoad(throttle <-chan time.Time)

并修改上述调用语句:

  1. gen.genLoad(throttle)

因为该方法中的代码已经可以直接使用作为载荷发生器字段的endSign通道了。

注意,由于genLoad方法所代表的控制流程是被同步执行的,所以在载荷发生器自动停止的流程中,endSign字段的第一个作用是无效的。也就是说,在这样的流程中,它仅起到了传递调用执行计数的作用。在Start方法的最后(也就是紧随对genLoad方法的调用之后),我们是这样编码的:

  1. // 接收调用执行计数
  2. callCount := <-gen.endSign
  3. gen.status = lib.STATUS_STOPPED
  4. logger.Infof("Stopped. (callCount=%d)\n", callCount)

genLoad方法被执行完毕之后,Start方法中代码会试图从endSign通道中接收元素值。这一接收操作总会立即成功。因为在genLoad方法被执行完毕之前,endSign通道中一定会缓冲有一个元素值。在该接收操作完成后,载荷发生器的停止状态会立即被设置。最后,从endSign通道接收到的调用执行计数会被记录到日志中。由logger变量的初始化方式可知,该日志会被打印到标准输出上。

纵观前面对载荷发生器的实现类型myGenerator的代码的展示和描述可知,它的每一个字段都是不可或缺的。通过它们,我们将载荷发生器中的各个组件有机地联系在了一起。根据使用方指定的timeoutNslpsdurationNs的值,我们设定了载荷发生器的行为。在异步地执行多个调用过程的流程中,我们使用其lib.GoTickets类型的字段tickets严格地限制住了专用Goroutine的数量。不过,为了达到这一目的,我们还比较精细地设计了该流程的具体实现方式。其中涉及了定时器的高级用法。字段stopSignendSigncancelSign对于载荷发生器的可控性的实现来说都是十分关键的。有了对它们的联合使用,才使得载荷发生器能够被安全地停止。其中,stopSign通道和endSign通道都充分起到了传递关键数据和状态的作用。而对cancelSign字段的类型的选择则充分体现了我们对直接被多个Goroutine共享的变量的考量。在充分的考量之下,我们决定将该字段的类型设定为byte,而不是某一个通道类型。在编写并发程序的过程中,我们总是应该小心应对,并以最直接、简单和安全的方式来组织代码。此外,在并发程序中,异步的传递结果总是有必要的。使用通道是跨Goroutine传递数据的绝佳选择。在载荷发生器中,我们就是通过它的通道类型的字段resultCh来收集和传递各个异步调用过程的结果的。

好了,我们已经悉数展现和解释了实现一个载荷发生器所需的主要实现代码。这些代码都被放在goc2p项目的loadgen代码包及其子代码包中。读者可以在需要时对照阅读。

在下一小节中,我们会利用之前讲到的知识编写针对载荷发生器的测试源码文件。与此同时,我们还会编写一个lib.Caller接口的实现类型,并使用这个类型来完成对载荷发生器的功能测试。

7.3.5 调用器和功能测试

载荷发生器的调用器应该由使用者给定。这样才能够使载荷发生器具有良好的可扩展性。使用者应该知道怎样生成可以施加于被测软件的载荷、怎样向被测软件发送载荷,以及怎样验证被测软件对载荷做出的响应。这3个行为的声明就组成了我们先前提到的调用器接口loadgen/lib.Caller

载荷发生器的使用者应该先开发出调用器的实现,然后再使用载荷发生器为被测软件做性能测试。我们下面会简单展示并描述一个调用器的实现过程,然后通过它初始化一个载荷发生器并对被测软件进行测试。注意,我们进行此测试的目的并不是要测试被测软件,而是要以此检验载荷发生器的功能。因此,我们除了要编写出一个有效的调用器实现之外,还要开发出一个简单的被测软件。只有形成了这样一个闭环之后,我们才能真正地使载荷发生器运行起来,并以此检查其体现的功能是否完全符合我们的设计初衷。

假设有这样一个被测软件,它提供了一个基于网络的API。该API的功能就是根据请求中的参数进行简单且有限的算术运算(针对整数的加减乘除运算),并将结果作为响应返回给请求方。这个被测软件相当简单,所以我们的调用器实现也不会太复杂。读过上一章的读者可能立刻想到使用Go语言的标准库中提供的Socket编程API来实现它们。的确,我们需要的就是这个。

为了贴合本节的主题,我并不想在被测软件的具体实现上耗费笔墨。但是,既然调用器需要与它进行通讯,那么我们还是有必要对请求和响应的结构做一些介绍的。为了简化对它们的组装和解析的操作,我们会使用标准库的encoding/json代码包中的API。这些API既可以让我们非常方便地把某个结构体类型的实例转换成json格式的普通文本数据,又可以把符合格式要求的json数据转换成对应的结构体类型的实例。在确定了这样的数据转换方式之后,我们需要声明两个分别代表请求和响应的结构体类型。请看下面的代码:

  1. type ServerReq struct {
  2. Id int64
  3. Operands []int
  4. Operator string
  5. }
  6. type ServerResp struct {
  7. Id int64
  8. Formula string
  9. Result int
  10. Err error
  11. }

结构体类型ServerReq表示了请求的结构。它包含了3个字段。其中,字段Id的值会唯一标识一个请求。而字段OperandsOperator则分别代表了多个运算数和一个运算符。根据Operands字段的类型,读者应该可以得知这里只允许针对整数的算术运算。虽说Operator字段的类型是string,但是它允许的值是非常有限的,即“+”、“-”、“*”和“/”。这4个值分别代表了加、减、乘和除。如果请求中实际给定的运算符不在此范围之内,那么被测软件肯定不会返回预期的响应。

再来看代表响应结构的结构体类型ServerResp。它的字段Id的值应该唯一标识一个响应。它的值应该和与之对应的请求的Id字段的值一致。字段Result的值需要表示请求中要求的算术运算的结果值。我们已经知道,在Go语言中,两个整数经由“/”运算之后所得到的结果值也会是一个整数。因此该字段的类型是适当的。字段Formula的值会代表相应的运算式子,例如:2 + 4 + 5 = 11。最后,字段Err是一个error类型的字段。如果被测软件在处理请求的过程中出现了错误,那么该字段的值就会表示相应的错误。

我们把上述两个结构体类型的声明以及被用于测试载荷发生器的调用器和被测软件的实现代码都放到了代码包loadgen/testhelpler中。这些代码需要用到loadgen/lib包中的API。为了不引起混淆和编写方便,我们在导入语句中把该代码包的别名设定为了loadgenlib。在本小节后面的部分中出现的带有loadgenlib.前缀的限定标识符代表的就是loadgen/lib包中的程序实体。

1. 调用器实现的基本结构

在了解了请求和响应的结构之后,我们开始着手编写调用器的实现。调用器需要通过TCP协议与被测软件通讯。因此,我们将这个调用器接口的实现类型命名为TcpComm(TCP Communicator的一种缩写形式)。TcpComm类型的结构非常简单,因为在其中只需要存储被测软件的网络地址:

  1. type TcpComm struct {
  2. addr string
  3. }

字段addr的值一般由IP地址(或主机名)和端口号组成,形如:"127.0.0.1:8080"TcpComm类型(更确切地说是*TcpComm类型)需要拥有3个公开的方法声明。这3个公开方法的声明应该与loadgenlib.Caller接口中的方法声明一一对应。我们先来看BuildReq方法。

2. BuildReq方法

该方法应该负责生成并返回一个loadgenlib.RawReq类型的值。从该方法的签名上来看,这个生成规则是需要调用器的编写者自己定义的。读者还记得loadgenlib.RawReq类型的声明吗?它包含了两个字段——int64类型的Id和[]byte类型的Req。注意,在这里,这个Req字段的值就应该代表与某个ServerReq类型值对应的json格式的普通文本数据。下面是*TcpComm类型的BuildReq方法的具体实现:

  1. func (comm *TcpComm) BuildReq() loadgenlib.RawReq {
  2. id := time.Now().UnixNano()
  3. sreq := ServerReq{
  4. Id: id,
  5. Operands: []int{
  6. int(rand.Int31n(1000)),
  7. int(rand.Int31n(1000))},
  8. Operator: func() string {
  9. op := []string{"+", "-", "*", "/"}
  10. return op[rand.Int31n(100)%4]
  11. }(),
  12. }
  13. bytes, err := json.Marshal(sreq)
  14. if err != nil {
  15. panic(err)
  16. }
  17. rawReq := loadgenlib.RawReq{Id: id, Req: bytes}
  18. return rawReq
  19. }

我们已经知道,ServerReq类型和loadgenlib.RawReq类型的Id字段的类型都是int64。因此它们可以被用来存储非常大的有符号整数。为了保持Id的唯一性,我们使用时间戳来作为它的值。这个时间戳需要很高的精度,可以表示到纳秒。因为在高并发的情况下,同一秒种甚至同一毫秒内都可能有很多原始请求被生成出来。time包的函数Now可以返回一个代表了调用它的那个时刻的Time类型值。该值有个名为UnixNano的方法,可以返回一个代表该时刻的纳秒数。这个纳秒数是从1970年1月1日的零时整开始算起的。我们就用它来充当Id的值。

怎样用复合字面量来初始化结构体类型的值,读者应该已经相当熟悉了。但是,这里比较特别的是,我们应该尽可能地体现出ServerReq类型值的随机性。这样才能提供更高的测试覆盖度。因此,对于OperandsOperator这两个字段的赋值,我们都使用了伪随机的算法。

首先来看Operands字段。原则上来说,我们可以为它赋予一个任意长度的[]int类型值。但是,我们在这里偷了一点懒,只是初始化了一个包含两个元素值的[]int类型值。不过,我们使用了math/rand包中的Int31n函数来生成其中的元素值。math/rand.Int31n函数可以在给定的范围内生成一个伪随机数。在这里,这个范围是[0,1000)。

对于Operator字段,我们使用一个针对匿名函数的调用表达式来代表它的值。这个匿名函数的唯一结果的类型是string类型的。它也是该表达式的类型。由于Operator字段的值的允许范围非常有限,所以我们很容易就此进行随机的选择。这里依然使用了math/rand.Int31n函数。

在生成好一个ServerReq类型值之后,我们需要把它转换为json格式的普通文本。这样才能够用它来给loadgenlib.RawReq类型的Req字段赋值。encoding/json代码包的函数Marshal可以实现这种转换。我们只要在调用它的时候把一个结构体类型值作为参数传给它就可以了。encoding/json.Marshal返回两个结果,第一个结果会是代表了转换后的文本的[]byte类型值,而第二个结果则是代表了可能发生的错误的error类型值。如果第二个结果值为nil,那么就说明转换是成功的。在转换失败的情况下我们引发了一个运行时恐慌。因为这种情况不应该发生。在载荷发生器中,这个运行时恐慌会被替换为一个代表了调用致命错误的调用结果。还记得*myGenerator类型的asyncCall方法中的那条defer语句吗?它会负责这一替换。

至于BuildReq方法的最后两条语句就很好理解了。我们在前面的准备工作的基础上生成了一个loadgenlib.RawReq类型值,并将它作为该方法的结果值返回。虽说我们拿到了这样一个值,但是实际上我们只需要其中的Req字段的值。而它的Id字段则只是为了方便载荷发生器的提取和鉴别而添加的。我们在讲解*myGeneratorasyncCall方法的时候展示过相关的代码。

3. Call方法

调用器的Call方法接受两个参数。参数req即代表了请求内容,其类型为[]byte。而time.Duration类型的参数timeoutNs则代表了超时时间。它的值应该与载荷发生器的timeoutNs字段的值一致。把它传给Call方法的含义是告诉调用器要进行超时判断。不过,这并不是强制的。因为载荷发生器已经采取了相应的措施来实时地判断调用超时。

下面,我们就来看看*TcpCommCall方法的具体实现:

  1. func (comm *TcpComm) Call(req []byte, timeoutNs time.Duration) ([]byte, error) {
  2. conn, err := net.DialTimeout("tcp", comm.addr, timeoutNs)
  3. if err != nil {
  4. return nil, err
  5. }
  6. _, err = write(conn, req, DELIM)
  7. if err != nil {
  8. return nil, err
  9. }
  10. return read(conn, DELIM)
  11. }

代码包net的方法DialTimeout被用来建立网络通讯。它的特点是可以设定代表超时时间的纳秒数。这使得参数timeoutNs有了用武之地。当已达到超时时间但还未完成通讯的建立的时候,该方法的第二个结果值就会是一个代表了操作超时的error类型值。如果该方法的第二个结果值是nil,那么它的第一个结果值就会是一个代表了通讯连接的net.Conn类型值。我们需要在这里判断第二个结果值并做出相应的处理。如果通讯建立成功,我们就先将请求数据(即我们在BuildReq方法中生成的loadgenlib.RawReq类型值的Req字段的值)写入到连接中,然后在成功后再等待并从连接中读取响应数据。这两个操作分别由write函数和read函数负责。我们在讲Socket的时候已经详细讲解了相关的细节,所以在这里就不展开这两个函数的实现了。

这里还有一点需要注意。我们知道,基于TCP协议的通讯是使用字节流来传递上层给予的消息的。它会根据具体情况为消息分段。但是,这并不意味着参与通讯的另一方可以依此来感知消息的分界。因此,我们需要显式地为请求数据添加结束符。传给write方法和read方法的参数DELIM就代表了这个结束符。这两个方法会使用它来分离单个的请求或响应。在loadgen/testhelpler代码包中有该常量的声明:

  1. const (
  2. DELIM = '\n'
  3. )

4. CheckResp方法

类型*TcpComm的方法CheckResp的声明是这样的:

  1. func (comm *TcpComm) CheckResp(
  2. rawReq loadgenlib.RawReq,
  3. rawResp loadgenlib.RawResp) *loadgenlib.CallResult

调用CheckResp方法的时机是在载荷发生器接收到被测软件的响应之后。如果原始响应中没有携带任何错误,那么载荷发生器就会调用它来对原始响应进行进一步的检查,并根据检查结果设置其返回的调用结果的Code字段和Msg字段的值。

CheckResp方法的方法体的开始处,我们需要先对将会被该方法返回的调用结果进行必要的初始化,像这样:

  1. var commResult loadgenlib.CallResult
  2. commResult.Id = rawResp.Id
  3. commResult.Req = rawReq
  4. commResult.Resp = rawResp

并且,在开始检查原始响应之前,我们必须要把参数rawReq的字段Req的值和rawResp的字段Resp的值转换为相应的结构体类型值。这需要用到json.Unmarshal函数。若以前者为例,则代码如下:

  1. var sreq ServerReq
  2. err := json.Unmarshal(rawReq.Req, &sreq)

我们需要把rawReqReq字段的值和刚刚声明的ServerReq类型的变量的指针值作为参数传给json.Unmarshal函数。该函数在被执行完成之后会返回一个error类型值。如果在转换过程中发生了错误,那么代表结果的变量err的值将会是非nil的。若发生这种情况,那么我们就会这样做:

  1. if err != nil {
  2. commResult.Code = loadgenlib.RESULT_CODE_FATAL_CALL
  3. commResult.Msg =
  4. fmt.Sprintf("Incorrectly formatted Req: %s!\n", string(rawReq.Req))
  5. return &commResult
  6. }

可以看到,我们在对调用结果commResult的字段CodeMsg进行必要的设置后,直接将它作为当前方法的结果返回了。注意,因为rawReqReq字段的值就是由相应的ServerReq类型值经调用json.Marshal函数而得来的,所以此处的转换不应该发生任何错误。所以,如果发生了错误,那么我们就会视它为一个致命的调用错误。

对于rawResp的字段Resp的值的转换,我们使用同样的方法。只不过在出错时,我们为commResult的字段CodeMsg赋予不同的值。相关代码如下:

  1. var sresp ServerResp
  2. err = json.Unmarshal(rawResp.Resp, &sresp)
  3. if err != nil {
  4. commResult.Code = loadgenlib.RESULT_CODE_ERROR_RESPONSE
  5. commResult.Msg =
  6. fmt.Sprintf("Incorrectly formatted Resp: %s!\n", string(rawResp.Resp))
  7. return &commResult
  8. }

限定标识符loadgenlib.RESULT_CODE_ERROR_RESPONSE代表了响应内容错误时的响应代码。

在上述工作完成之后,*TcpComm类型的CheckResp方法中的代码就开始对sresp变量的正确性进行检查。具体的检查项目如下。

  • 检查srespId字段的值是否与变量sreqId字段的值相等。也就是说这个原始响应是否与该原始请求相对应。如果不是,那么就说明这里的请求和响应是不匹配的。该项检查未通过。

  • 检查srespErr字段的值是否为非nil。如果是,就说明被测软件在处理请求的过程中发生了错误。该项检查未通过。

  • 检查变量srespResult字段的值是否正确。也就是说,它是否为原始请求中的运算符施加于同在其中的若干个运算数之后所得到的结果值。我们在这里用到的方法应该与被测软件采用的运算方法等效。若该字段的值不正确,则不能通过该项检查。

只要某一项检查未通过,那么后续的检查就会被忽略。该方法会立即根据实际情况设置commResult的变量CodeMsg的值,并将commResult作为结果值返回。若上述检查都通过了,则说明原始响应sresp是完全正确的。这时,该方法同样会设置调用结果的相应字段:

  1. commResult.Code = loadgenlib.RESULT_CODE_SUCCESS
  2. commResult.Msg = fmt.Sprintf("Success. (%s)", sresp.Formula)

设置完成后,CheckResp方法也同样会把commResult变量的值作为其结果值返回。

至此,我们完成了对结构体类型TcpComm以及为了实现loadgenlib.Caller接口而声明的公开方法的编写。下面,我们就使用这个调用器实现来对载荷发生器进行测试。

5. 测试载荷发生器

为了测试载荷发生器的实现类型*myGenerator,我们应该首先建立一个测试源码文件。与myGenerator类型相关的代码都被放到了loadgen代码包的库源码文件gen.go中。因此,与之相对应的测试源码文件就应该被命名为gen_test.go,且同在loadgen代码包中。

我们在第5章讲过,若要编写被用于测试的代码就需要用到testing代码包中的API。并且,在测试源码文件中,被用来进行功能测试的函数的名称应该以“Test”为前缀,并接受*testing.T类型的参数。因此,我们在gen_test.go文件中声明了这样一个函数:

  1. func TestStart(t *testing.T)

下面我们来编写该函数的函数体。首先,为了让载荷发生器在时机到来之时尽可能快地将载荷发送出去,我们使用runtime代码包的GOMAXPROCS函数来设置该测试可以使用的P的最大数量。我们在前面说过,设置过大的P最大数量并无益于并发程序的性能。所以,我们只根据当前计算机的CPU核心数量来设置确定该数量。runtime.GOMAXPROCS函数的调用代码如下:

  1. // 设置P最大数量
  2. runtime.GOMAXPROCS(runtime.NumCPU())

注意,在测试载荷发生器之前,我们需要先行启动被测软件。代码如下:

  1. // 初始化服务器
  2. server := thelper.NewTcpServer()
  3. defer server.Close()
  4. serverAddr := "127.0.0.1:8080"
  5. t.Logf("Startup TCP server(%s)...\n", serverAddr)
  6. err := server.Listen(serverAddr)
  7. if err != nil {
  8. t.Fatalf("TCP Server startup failing! (addr=%s)!\n", serverAddr)
  9. t.FailNow()
  10. }

为了方便,我们在导入代码包的时候把loadgen/testhelperloadgen/lib的别名分别设定为了thelperloadgenlib。另外,由于我们在前面并没有展开被测软件的实现代码,所以在这里会对初始化并启动被测软件的过程进行简单的描述。

函数thelper.NewTcpServer的功能是创建并初始化一个TCP服务器(即我们所说的被测软件)。紧随其后的defer语句保证了在该功能测试方法结束之前该TCP服务器会被关闭。我们指定该服务器的监听IP为127.0.0.1,监听端口为8080,并以此网络地址来启动这个服务器。如果它在被启动的过程中有错误发生,那么我们就调用t.Fatalf函数打印出错误信息并使当前的测试立即失败。

在这个TCP服务器被启动成功之后,我们就需要着手调用器的初始化工作了。首先,我们使用thelper.NewTcpComm函数来创建和初始化一个基于TCP通讯协议的调用器。我们在前面未提及该函数的原因是它相当地简单。该函数的声明如下:

  1. func NewTcpComm(addr string) loadgenlib.Caller {
  2. return &TcpComm{addr: addr}
  3. }

注意,thelper.NewTcpComm函数的结果的类型为loadgenlib.Caller。这是为了确保*TcpComm是该接口的一个实现类型。

据此,我们应该这样来初始化需要被传给载荷发生器的调用器:

  1. // 初始化调用器
  2. comm := thelper.NewTcpComm(serverAddr)

当然,除了调用器之外,初始化载荷发生器时还需要其他一些参数。这些参数的声明如下:

  1. resultCh := make(chan *loadgenlib.CallResult, 50)
  2. timeoutNs := 3 * time.Millisecond
  3. lps := uint32(200)
  4. durationNs := 12 * time.Second

变量resultCh代表了调用结果通道。我们把它的容量设置为50,实际上并没有什么特殊的原因。在该功能测试方法中,我们会在启动载荷发生器之后立即不间断地尝试从该通道中接收元素值。因此,原则上说,这里的调用结果通道的容量可以被设置得很小。具体的数值取决于相应的发送操作和接收操作进行的时机和频率。而这里设定的容量值只是一个宽泛的估值而已。

对于在resultCh下面的那3个变量,我们应该已经很熟悉了。使用这些参数,我们就可以这样创建出一个载荷加载器:

  1. gen, err := NewGenerator(
  2. comm,
  3. timeoutNs,
  4. lps,
  5. durationNs,
  6. resultCh)
  7. if err != nil {
  8. t.Fatalf("Load generator initialization failing: %s.\n",
  9. err)
  10. t.FailNow()
  11. }

我们在这里使用了loadgen包中的NewGenerator函数。要记得检查它返回的第二个结果值。如果err不为nil,那么我们就只好立即停止当前测试了。否则,就可以这样启动载荷发生器gen

  1. go gen.Start()

我们以异步的方式来执行gen.Start方法的原因是,可以避免因调用结果通道resultCh被填满而导致的相关Goroutine(即载荷发生器中的那些专用Goroutine)的阻塞。如果我们不在gen.Start方法被执行的同时进行相应次数的针对resultCh通道的接收操作,那么它们就会被一直阻塞。还记得吗?在专用Goroutine中,如果对调用结果通道的发送操作未完成,那么它就不会将Goroutine票归还给Goroutine票池。如果与此同时Goroutine票池中的Goroutine票被用尽了,那么就会使后续的异步调用过程无法进行。进一步说,载荷发生器会先确保拿到一张Goroutine票,然后再启用一个专用Goroutine来异步地执行调用过程。如果Goroutine票池空了,那么载荷发生器的控制流程就会被永久地阻塞在对它的asyncCall方法的某次调用操作上。这样的话,无论是开启节流阀还是发送停止信号都不会使载荷发生器产生任何响应。更糟的是,这也会导致像功能测试函数TestStart这样的载荷发生器的使用方也被永久阻塞!

我意识到,这是一个严重的BUG!它即是我们测试载荷发生器(实际上,还没真正开始)的过程中发现的第一个问题。我们不应该仅仅通过异步的执行gen.Start方法来绕过这个问题,而应该立即解决它!

清除这个BUG其实并不困难,我们只需将载荷发生器的控制流程以及后续的接收调用执行计数等操作也异步化,即把在它的Start方法中的如下代码:

  1. // 生成载荷
  2. logger.Infoln("Generating loads...")
  3. gen.genLoad(throttle)
  4. // 接收已发载荷总数
  5. genCount := <-gen.endSign
  6. gen.status = lib.STATUS_STOPPED
  7. logger.Infof("Stopped. (genCount=%d)\n", genCount)

改为:

  1. go func() {
  2. // 生成载荷
  3. logger.Infoln("Generating loads...")
  4. gen.genLoad(throttle)
  5. // 接收调用执行计数
  6. callCount := <-gen.endSign
  7. gen.status = lib.STATUS_STOPPED
  8. logger.Infof("Stopped. (callCount=%d)\n", callCount)
  9. }()

这样会使载荷发生器的Start方法在被调用后,不会等待控制流程的执行的完成,而是直接返回。我们刚刚发现的那个严重的BUG就这样被解决了。

在清除了这个BUG之后,我们继续编写TestStart函数。现在,我们可以直接调用gen.Start方法了:

  1. // 开始!
  2. t.Log("Start load generator...")
  3. gen.Start()

该方法会很快的返回。

在启动载荷发生器之后,我们就要开始对调用结果进行收集和展示了。载荷发生器中启用的每一个专用Goroutine都会在调用过程结束之后向调用结果通道resultCh发送调用结果。因此,我们在这里只需做到及时地接收它们即可,如下所示:

  1. // 显示调用结果
  2. countMap := make(map[loadgenlib.ResultCode]int)
  3. for r := range resultCh {
  4. countMap[r.Code] = countMap[r.Code] + 1
  5. if printDetail {
  6. t.Logf("Result: Id=%d, Code=%d, Msg=%s, Elapse=%v.\n",
  7. r.Id, r.Code, r.Msg, r.Elapse)
  8. }
  9. }

这段代码不断地从resultCh通道中获取调用结果。其中,printDetail是我们在gen_test.go文件中声明的一个bool类型的变量。如果该变量的值为true,那么上面的代码就会打印出接收到的所有调用结果的细节信息。另一方面,这段代码总会以调用结果的Code字段的值作为依据对它们进行分类和计数,并把计数信息存入countMap变量代表的字典中。这样的统计让我们可以方便地展示出各类调用结果的数量:

  1. var total int
  2. t.Log("Code Count:")
  3. for k, v := range countMap {
  4. codePlain := loadgenlib.GetResultCodePlain(k)
  5. t.Logf(" Code plain: %s (%d), Count: %d.\n",
  6. codePlain, k, v)
  7. total += v
  8. }

函数loadgenlib.GetResultCodePlain的功能是返回一个string类型值。该值实际上是一个短语。它解释了作为该函数参数的响应代码的含义。在各类调用结果之中,我们最关心的应该是响应代码为loadgenlib.RESULT_CODE_SUCCESS的调用结果的数量及其占调用结果总数的比例。因此,有了下面这段代码:

  1. t.Logf("Total load: %d.\n", total)
  2. successCount := countMap[loadgenlib.RESULT_CODE_SUCCESS]
  3. tps := float64(successCount) / float64(durationNs/1e9)
  4. t.Logf("Loads per second: %d; Treatments per second: %f.\n", lps, tps)

这里的tps的含义是被测软件平均每秒有效的处理(或称响应)载荷的数量。

现在我们来看看该测试的实际输出。若printDetail变量的值为false,那么在我们通过go test命令执行gen_test.go中的TestStart函数之后,标准输出上会出现如下内容(这里只截取了最后一部分):

  1. --- PASS: TestStart (12.07 seconds)
  2. gen_test.go:21: Startup TCP server(127.0.0.1:8080)...
  3. gen_test.go:37: Initialize load generator (timeoutNs=3ms, lps=200, durationNs=12s)...
  4. gen_test.go:51: Start load generator...
  5. gen_test.go:65: Code Count:
  6. gen_test.go:69: Code plain: Success (0), Count: 2364.
  7. gen_test.go:69: Code plain: Call Timeout Warning (1001), Count: 28.
  8. gen_test.go:73: Total load: 2392.
  9. gen_test.go:76: Loads per second: 200; Treatments per second: 197.000000.
  10. PASS
  11. ok loadgen 12.278s

从这些输出内容上我们可以看出,在这次对被测软件的性能测试中,它有效处理的载荷的数量的比例为98.5%。更确切地说,有1.5%的载荷的响应没有被测软件及时送回。

好了,在TestStart函数中,我们主要对载荷发生器的启动流程、控制流程和自动停止流程进行了测试。从测试日志上看,载荷发生器表现良好。

另一方面,我们已经知道,载荷发生器还可以被手动的停止。下面我们就来对这部分功能进行测试。该测试由gen_test.go文件中的TestStop函数代表。

TestStop函数中,被用于初始化服务器、调用器和载荷发生器,以及启动载荷发生器的代码与TestStart函数中的代码几乎一致。唯一不同的是,被测软件的网络地址由127.0.0.1:8080被改为了127.0.0.1:8081。这是为了彻底地避免两个测试的相互干扰。

除此之外,在TestStop函数的显示调用结果的部分中,我们也稍微做了一些修改:

  1. // 显示调用结果
  2. countMap := make(map[loadgenlib.ResultCode]int)
  3. count := 0
  4. for r := range resultCh {
  5. countMap[r.Code] = countMap[r.Code] + 1
  6. if printDetail {
  7. t.Logf("Result: Id=%d, Code=%d, Msg=%s, Elapse=%v.\n",
  8. r.Id, r.Code, r.Msg, r.Elapse)
  9. }
  10. count++
  11. if count > 3 {
  12. gen.Stop()
  13. }
  14. }

我们使用变量count来实时地对接收到的调用结果进行计数。并且,我们在第4次迭代即将完成的时候手动地停止载荷发生器。

按照我们的预期,载荷发生器的停止会导致调用结果通道的关闭,从而导致这个for代码块的执行的结束。但是,当我们真正运行该测试的时候却发现,情况并不是这样。相关的测试日志如下:

  1. === RUN TestStop
  2. 2014/04/18 16:11:57 [INFO] loadgen.NewGenerator : (gen.go:34) - New a load generator...
  3. 2014/04/18 16:11:57 [INFO] loadgen.NewGenerator : (gen.go:35) - Checking the parameters...
  4. 2014/04/18 16:11:57 [INFO] loadgen.NewGenerator : (gen.go:66) - Passed. (timeoutNs=3ms, lps=200, durationNs=12s)
  5. 2014/04/18 16:11:57 [INFO] loadgen.(*myGenerator).init : (gen.go:75) - Initializing the load generator...
  6. 2014/04/18 16:11:57 [INFO] loadgen.(*myGenerator).init : (gen.go:87) - Initialized. (concurrency=1)
  7. 2014/04/18 16:11:57 [INFO] loadgen.(*myGenerator).Start : (gen.go:215) - Starting load generator...
  8. 2014/04/18 16:11:57 [INFO] loadgen.(*myGenerator).Start : (gen.go:221) - Setting throttle (5ms)...
  9. 2014/04/18 16:11:57 [INFO] loadgen.(*myGenerator).Start : (gen.go:242) - Generating loads...
  10. 2014/04/18 16:11:57 [INFO] loadgen.(*myGenerator).handleStopSign : (gen.go:186) - Closing result channel...
  11. 2014/04/18 16:11:57 [INFO] loadgen.func·006 : (gen.go:256) - Stopped. (callCount=3)
  12. 2014/04/18 16:12:07 [INFO] loadgen.func·004 : (gen.go:228) - Stopping load generator...
  13. *** Test killed: ran too long (10m0s).
  14. FAIL loadgen 600.287s

可以看到,无论是载荷发生器的创建、初始化和启动还是载荷的生成和发送都是正常的。我们需要特别关注的是最后5行内容。倒数第五行和第四行的内容表示,在调用结果通道被关闭之后载荷发生器被停止,且停止之时的调用执行计数是3。这里的计数表示了控制流程(由载荷发生器的genLoad方法代表)中被用来异步地执行调用过程的那个循环,在第三次迭代即将结束的时候被终止了。

请注意,倒数第四行日志并不是载荷发生器的Stop方法中的日志记录语句打印出来的(该方法中没有这类语句),而是它的Start方法中的最后面的那条语句为之的。

这样是正确的吗?在回答这个问题之前,让我们再回顾一下载荷发生器的实现细节。还记得吗?我们在载荷发生器的Start方法和Stop方法中都包含了针对endSign通道(也就是那个被用来告知停止信号已被处理完毕的通道)的接收操作。但是,在载荷发生器即将处理完停止信号的时候只会向endSign通道发送一个元素值。如果我们在载荷发生器自动停止之前通过调用其Stop方法来手动停止它,那么endSign通道中的那个唯一的元素值就会被Start方法中的那个接收操作取走。因为该接收操作是先被进行并阻塞的。这就导致了Stop方法中的那个接收操作被永久阻塞。因为不会再有任何代码向endSign通道发送元素值了。显然,我们先前在此处的设计并不合理。

由于这样的设计缺陷,在前面的测试日志中的倒数第四行内容被打印出来之后,功能测试函数TestStop的执行就被停滞了。载荷发生器的控制流程所在的Goroutine在记录该行日志之后就被运行结束了。而调用它的Stop方法的代码所在的Goroutine却被阻塞了。这个Goroutine即是执行TestStop函数的那一个。

可以看到,在倒数第四行内容被打印出来的数十秒之后,倒数第三行内容出现了。这是由于在载荷发生器的Start方法中初始化的定时器执行了我们给定的那个匿名函数。但是,在这时,执行这个匿名函数已经没有任何意义了。因为载荷发生器的控制流程的执行已经结束了。而对TestStop函数的执行依然处于停滞状态。当前的功能测试已经无法完成,直到我们强行终止其所属的进程。如果我们没有这样做,那么等到该测试的运行时间达到10分钟之后,go test命令会自己结束它并宣告测试失败。这体现在前面展示的测试日志的最后两行内容上。

该问题是在我们对载荷发生器的功能进行测试的过程中发现的第二个问题。我们怎样才能解决它呢?我们已经知道了这个问题的症结所在,那就是两种载荷发生器停止方式之间的冲突。它们都以从endSign通道中成功接收到一个元素值作为载荷发生器已经被停止的确认。因此,最简单的方式就是,在即将处理完停止信号的时候向该通道连续发送两个相同的元素值(即表示了调用执行计数的值)。这样的解决方案既简单又清晰。

具体的代码修改工作可被分为两步。

  1. endSign通道的容量扩充至2。这样,我们就可以连续的向它发送两个元素值且不用担心会被阻塞了。相关代码在*myGenerator类型的Start方法中。修改后的代码为:
  1. // 初始化完结信号通道
  2. gen.endSign = make(chan uint64, 2)
  1. 将包含在载荷发生器的genLoad方法中的针对endSign通道的那两条发送语句都删除掉。然后,在handleStopSign方法的方法体的最后面添加两条与之相同的语句。修改后的handleStopSign方法和genLoad方法如下:
  1. func (gen *myGenerator) handleStopSign(callCount uint64) {
  2. gen.cancelSign = 1
  3. logger.Infof("Closing result channel...")
  4. close(gen.resultCh)
  5. gen.endSign <- callCount
  6. gen.endSign <- callCount
  7. }
  8. func (gen *myGenerator) genLoad(throttle <-chan time.Time) {
  9. callCount := uint64(0)
  10. Loop:
  11. for ; ; callCount++ {
  12. select {
  13. case <-gen.stopSign:
  14. gen.handleStopSign(callCount)
  15. break Loop
  16. default:
  17. }
  18. gen.asyncCall()
  19. if gen.lps > 0 {
  20. select {
  21. case <-throttle:
  22. case <-gen.stopSign:
  23. gen.handleStopSign(callCount)
  24. break Loop
  25. }
  26. }
  27. }
  28. }

在进行完上述修改之后,我们再使用go test运行测试源码文件gen_test.go就会得到正常的结果。

好了,我们现在可以说载荷发生器的实现类型loadgen.myGenerator,以及我们自定义的调用器的实现类型loadgen/testhelper.TcpComm在功能上已经是正确的了。作为一款简单的性能测试工具,该程序已经可以被投入使用了。当然,关于该程序是否可以完全正确地模拟极高的并发量,我们还没有进行深入的测试。读者如果有兴趣的话,可以为这个载荷发生器的实现类型编写性能测试。

除此之外,我们还可以为载荷发生器编写出面向命令行以及GUI(Graphical User Interface,图形用户界面)的用户接口。这可以大大增强其易用性。也许,我们需要为此添加一些API以使载荷发生器对扩展更加开放。

总之,我们在本节实现并详述的程序可以被用于简单的性能测试。它向我们展现了Go语言的Goroutine和Channel的常规用法和一些小技巧。从这个角度讲,它还是非常具有参考意义的。读者可以试着修改或扩展它以满足自己的实际需要。