6.2 多进程编程

在现代操作系统中,我们可以很方便地编写出多进程的程序。在多进程程序中,如果多个进程之间需要协作完成任务,那么进程间通讯的方式就是需要重点考虑的事项之一。这种通讯常被叫作IPC(Interprocess Communication)。不同版本的Unix及其衍生系统中所支持的IPC方法都不尽相同。同时,针对IPC制定的标准也不止一个。因此,为了简单和统一,我们在讨论IPC的概念和使用方法的时候只针对Linux操作系统。我们会详细地描述Go语言能直接操纵的那些IPC方法。

在Linux操作系统中,我们可以使用的IPC方法有很多种。从处理机制的角度看,它们可以被分为三大类,即基于通讯的IPC方法、基于信号的IPC方法以及基于同步的IPC方法。其中,基于通讯的IPC方法又被分为以数据传送为手段的IPC方法和以共享内存为手段的IPC方法。前者包括了管道(Pipe)和消息队列(Message Queue)。管道可以被用来传送字节流,而消息队列可以被用来传送结构化的消息对象。以共享内存为手段的IPC方法主要以共享内存区(Shared Memory)为代表。它是最快的一种IPC方法。基于信号的IPC方法就是我们常说的操作系统的信号(Signal)机制。它是唯一的一种异步的IPC方法。在基于同步的IPC方法中,最重要的就是信号灯(Semaphore)。

在本节,我们会详细地描述Go语言支持的IPC方法。它们是管道、信号和Socket。不过,在介绍它们之前,我们还需要先了解一些基本的概念。

6.2.1 进程

我们在介绍具体IPC方法之前,理所当然地要对进程本身进行解说。这对于我们更加深刻地理解各种IPC方法的概念和使用也是很有好处的。

1. 进程的定义

进程是Unix衍生操作系统(包括Linux操作系统)的根本,因为所有的代码都是在进程中执行的。我们通常把一个程序的执行称为一个进程。反过来讲,进程被用于描述程序的执行过程。因此,程序与进程成为了一对相依的概念。它们分别描述了一个程序的静态形式和动态特征。除此之外,进程还是操作系统进行资源分配的一个基本单位。

2. 进程的衍生

了解Unix/Linux系统编程的读者都应该知道,一个进程可以使用系统调用fork创建若干个新的进程。前者被称为后者的父进程,后者被称为前者的子进程。每个子进程都是源自它的父进程的一个副本。它会获得父进程的数据段、堆和栈的副本,并与父进程共享代码段。每一份副本都是独立的,子进程对属于它的副本的修改对其父进程和兄弟进程(同父进程)都是不可见的,反之亦然。全盘复制父进程的数据是相当低效的一种做法。Linux操作系统内核(以下简称内核)使用写时复制(Copy On Write,常被简称为COW)等技术来提高进程创建的效率。当然,刚被创建的子进程也可以通过系统调用exec把一个新的程序加载到自己的内存中,而原先在其内存中的数据段、堆、栈以及代码段就会被替换掉。在这之后,子进程执行的就会是那个刚刚被加载进来的新程序。

在Unix/Linux操作系统中,每一个进程都有父进程。所有的进程共同组成了一个树状结构。内核启动进程作为进程树的根并负责系统的初始化操作。它是所有进程的祖先。然而,这个进程也是有父进程的——就是它自己。如果某一个进程先于它的子进程结束,那么这些子进程将会被内核启动进程“收养”,成为它的直接子进程。

3. 进程的标识

为了管理进程,内核必须对每个进程的属性和行为进行详细的记录,包括进程的优先级、状态、虚拟地址范围以及各种访问权限,等等。更具体地说,这些信息都会被记录在每个进程的进程描述符中。进程描述符并不是一个简单的符号,而是一个非常复杂的数据结构。被保存在进程描述符中的进程ID(常被称为PID)是进程在操作系统中的唯一标识。进程ID为1的进程就是我们之前所说的内核启动进程。进程ID是一个非负整数且总是被顺序的编号。新被创建的进程的ID总是前一个被创建的进程的ID递增的结果。进程ID也是可以被重复使用的。当进程ID已达到其最大限值时,内核会从头开始查找已被闲置的进程ID并使用最先找到的那一个作为新进程的ID。另外,进程描述符中还会包含当前进程的父进程的ID(常被称为PPID)。

在Go语言中,我们可以使用标准库代码包os提供的API来查看当前进程的PID和PPID,像这样:

  1. pid := os.Getpid()
  2. ppid := os.Getppid()

注意,PID并不传达与进程有关的任何信息。它只是一个用来唯一标识进程的数字而已。进程的属性信息只被包含在内核中的、与PID对应的进程描述符中。而PPID在实际操作中也并没有太多用处。不过它确实体现了两个进程之间的亲缘关系。我们可以利用这一点做一些事情。比如,顺藤摸瓜地查找守护进程的踪迹。

进程ID对内核以外的程序非常有用。内核可以高效地把进程ID转换成对应进程的描述符。我们可以shell命令kill终止某个进程ID所对应的进程,还可以通过进程ID找到对应的进程并向它发送信号。这在本节的后面会讲述到。

4. 进程的状态

在Linux操作系统中,每个进程在不同时刻都可能会有不同的状态。这些进程可能的状态共有6个,分别是:可运行状态、可中断的睡眠状态、不可中断的睡眠状态、暂停状态或跟踪状态、僵尸状态和退出状态。下面我们分别对这几种状态进行简要的说明。

  • 可运行状态(TASK_RUNNING,简称为R):如果一个进程处在该状态,那么说明它将要、立刻或正在CPU上运行。运行的时机是不确定的。这会由进程调度器来决定。

  • 可中断的睡眠状态(TASK_INTERRUPTIBLE,简称为S):当进程正在等待某个事件(比如网络连接或信号灯)的发生时会进入此状态。这样的进程会被放入对应事件的等待队列中。当事件发生时,对应的等待队列中的一个或多个进程就会被唤醒。

  • 不可中断的睡眠状态(TASK_UNINTERRUPTIBLE,简称为D):此种状态与可中断的睡眠状态的唯一区别就是它是不可被打断的。这意味着处在此种状态的进程不会对任何信号作出响应。更确切地讲,发送给处于不可中断状态的进程的信号直到该进程从此状态转出才会被传递过去。进程处于此种状态通常是由于在等待一个特殊的事件。比如在等待同步的I/O操作(磁盘I/O等)的完成。I/O是Input/Output的缩写,在这里可以理解为对输入输出信息的处理。

  • 暂停状态或跟踪状态(TASK_STOPPED或TASK_TRACED,简称为T):向进程发送SIGSTOP信号就会使该进程转入暂停状态,除非该进程正处于不可中断的睡眠状态。向正处于暂停状态的进程发送SIGCONT信号会使该进程转向可运行状态。处于被跟踪状态的进程会暂停并等待跟踪它的进程对它进行操作。例如,我们使用调试工具GDB在某个程序中设置一个断点,而后对应的进程在运行过程中会在断点处停下来并等待被操作。这时,此进程就处于跟踪状态。跟踪状态与暂停状态非常类似。但是,向处于跟踪状态的进程发送SIGCONT信号并不能使它被恢复。只有当调试进程进行了相应的系统调用或者退出之后,它才能够被恢复。

  • 僵尸状态(TASK_DEAD-EXIT_ZOMBIE,简称为Z):处于此状态的进程即将要结束。该进程占用的绝大多数资源也都已经被回收。不过还有一些信息未被删除,比如退出码以及一些统计信息。保留这些信息是考虑到该进程的父进程可能需要它们。由于此时的进程主体已经被删除而只留下了一个空壳,故此状态常被称为僵尸状态。

  • 退出状态(TASK_DEAD-EXIT_DEAD,简称为X):在进程退出的过程中,有可能连退出码和统计信息都不需要被保留。造成这种情况的原因可能是显式地让该进程的父进程忽略掉SIGCHLD信号(当一个进程消亡的时候,内核会给其父进程发送一个SIGCHLD信号以告知此情况),也可能是该进程已经被分离。分离的含义是让子进程和父进程分别独立运行。分离后的子程序将不会再使用和执行与父进程共享的代码段中的指令,而是加载并运行一个全新的程序(我们讲“进程的衍生”的时候提到过)。在这些情况下,该进程在退出的时候就不会转入僵尸状态,而会直接转入退出状态。处于退出状态的进程会立即被干净利落地结束掉。它所占用的系统资源也会被操作系统自动回收。

进程在其生命周期内可能会产生一系列的状态变化。简单地说,进程的状态只会在可运行状态和非可运行状态之间转换。图6-1展示了一般情况下的进程状态转换。

{%}

图 6-1 Linux操作系统进程的状态转换

虽然暂停状态和调试状态极为相似,但是也可以把它们看成两个不同的状态。因此,Linux操作系统中的进程有7种可能的状态的说法也是正确的。

5. 进程的空间

一个用户进程(或者说我们的程序的执行实例)总会生存于用户空间中。这些进程可以做很多事,但是却不能与其所在的计算机的硬件进行交互。内核可以与硬件交互,但是它却生存在内核空间中。用户进程无法直接访问内核空间。用户空间和内核空间体现了Linux操作系统对物理内存的划分。换句话说,这两个空间指的都是操作系统在内存上划分出的一个范围。它们共同瓜分了操作系统能够支配的内存区域,如图6-2所示。

6.2.1 进程 - 图2

图 6-2 Linux操作系统对虚拟内存的划分

内存区域中的每一个单元都是有地址的。这些地址是由指针来标识和定位的。通过指针来寻找内存单元的操作也被称为内存寻址。指针是一个正整数,由若干个二进制位表示。具体的二进制位的数量由计算机(更确切地说是CPU)的字长所决定。因此,在32位计算机中可以有效标识2的32次方个内存单元,而在64位计算机中可以有效标识2的64次方个内存单元,正如图6-2所示。

这里所说的地址并非物理内存中的真实地址,它们被称为虚拟地址。而由虚拟地址来标识的内存区域又被称为虚拟地址空间,有时也被称为虚拟内存。回顾图6-2,用户空间虚拟地址的范围是从0到TASK_SIZE,而内核空间则占据了其余虚拟地址所代表的空间。TASK_SIZE相当于这两个空间的分界线。它实际上是一个特定的常数。它的值由所在的计算机的体系结构决定。在不同的计算机体系结构下,TASK_SIZE的值可能是不同的。注意,虚拟内存的最大容量与实际可用的物理内存的大小是无关的。内核和CPU会负责维护虚拟内存与物理内存之间的映射关系。

内核会为每个用户进程分配的是虚拟内存而不是物理内存。每个用户进程被分配到的虚拟内存总是在用户空间中的,而内核空间被留给内核专用。另外,每个用户进程都会认为分配给它的虚拟内存就是整个用户空间。一个用户进程不可能操纵另一个用户进程的虚拟内存,因为后者的虚拟内存对于前者来说是不可见的。换句话说,这些进程的虚拟内存几乎是彼此独立、互不干扰的。这是由于它们基本上被映射到了不同物理内存之上。

内核会把进程的虚拟内存划分为若干页(page)。而物理内存单元的划分由CPU负责。一个物理内存单元被称为一个页框(page frame)。不同的进程的大多数页都会与不同的页框相对应,如图6-3所示。

{%}

图 6-3 进程的虚拟内存与物理内存

图中的进程A和进程B的大多数页都分别与物理内存中的不同页框相对应。但是,进程A的页7与进程B的页8共享了同一个页框(即最下面的一个页框)。这种页框的共享是被允许的。实际上,这正是作为IPC方法之一的共享内存区的基础。另外,我们看到,不论进程A还是进程B都有一些页没有与任何一个页框对应。这也是有可能的。这也许是由于该页没有数据或者数据还不需要被使用,也许是该页已经被换出至磁盘(确切地说,是Linux文件系统中的swap分区)中。

6. 系统调用

我们在前面说过,用户进程生存在用户空间中且无法直接操纵计算机的硬件,但是在内核空间中的内核却可以做到。用户进程无法直接访问内核空间,也无法随意指使内核去做它能做到的一些事。但是为了使用户进程能够使用操作系统更底层的功能,内核会暴露出一些接口以供它们使用。这些接口是用户进程使用内核功能(包括操纵计算机硬件)的唯一手段,也是用户空间和内核空间之间的一座桥梁。用户进程使用这些接口的行为被称为系统调用,但在很多时候“系统调用”这个词也指代内核提供的这些接口。注意,虽然系统调用也是由函数代表的,但它与普通的函数是有明显的区别的。系统调用是向内核空间发出的一个明确的请求,而普通的函数只是定义了如何获取一个给定的服务。更重要的是,系统调用会导致内核空间中的数据的存取和指令的执行,而普通函数却只能在用户空间中有所作为。当然,如果在一个函数的函数体中包含了系统调用,那么它的执行也将涉及对内核空间的访问。但是这种访问仍然是通过函数体内的系统调用来进行的。另外,系统调用是内核的一部分,而普通的函数却不是。

说到系统调用就不得不提及另外一对概念——内核态和用户态。为了保证操作系统的稳定和安全,内核依据由CPU提供的、可以让进程驻留的特权级别建立了两个特权状态。它们就是内核态和用户态。在大部分时间里CPU都处于用户态。这时CPU只能对用户空间进行访问。换言之,CPU在用户态下运行的用户进程是不能与内核接触的。当用户进程发出一个系统调用的时候,内核会把CPU从用户态切换到内核态,而后会让CPU执行对应的内核函数。CPU在内核态下是有权限访问内核空间的。这就相当于使用户进程通过系统调用使用到了内核提供的功能。当内核函数被执行完毕后,内核会把CPU从内核态切换回用户态,并把执行结果返回给用户进程。图6-4大致地描述了系统调用过程中的CPU状态切换和流程控制。

{%}

图 6-4 关于系统调用过程的示意图

这幅示意图描绘的流程比我们刚刚叙述得更加细致一些。从中我们可以看出一个系统调用从开始到结束的较完整流程。其中,封装例程与系统调用是一一对应的。实际上,它就是我们所说的内核暴露给用户进程的接口。另外,我们可以把图中的系统调用处理程序和系统调用服务例程看作是内核为了响应用户进程的系统调用而执行的一系列函数。我们在上面的叙述中把它们统称为了内核函数。最后,再强调一下,只有当CPU被切换至核心态之后才可以执行内核空间中的函数,而在内核函数被执行完毕后,CPU状态也会被及时地切换回用户态。

7. 进程的切换和调度

与其他分时操作系统一样,Linux操作系统也可以凭借CPU的威力,快速地在多个进程之间进行切换(也被称为进程间的上下文切换),以产生多个进程在同时运行的假象。每个进程也都会认为自己独占了CPU。这就是多任务操作系统这个称谓的由来。不过,无论切换速度如何,在同一时刻正在运行的进程也仅仅会有一个。当然,切换CPU正在运行的进程是需要付出代价的。例如,内核此刻要换下正在CPU上运行的进程A,并让CPU开始运行进程B。在换下进程A之前,内核必须要及时保存进程A的运行时状态。另一方面,假设进程B不是第一次被运行,那么在让进程B被重新运行之前,内核必须要保证已经依据之前保存的相关信息,把进程B恢复到之前被换下时的运行时状态。当然,需要运行的进程往往不只两个。但是在处理流程上是相通的。我们把这种在进程换出换入期间所必须要做的任务统称为进程切换。进程切换主要是由内核来完成的。除了进程切换之外,为了使各个生存着的进程都有被运行的机会、让它们共享CPU,内核还要考虑把哪一个进程应该作为下一个被运行的进程、应该在哪一时刻进行切换,以及被换下的进程需要在哪一时刻再被换上,等等。解决类似问题的方案和任务被统称为进程调度。

进程切换和进程调度是程序并发执行的基础。没有它们,程序的并发执行就无从谈起,我们所说的并发编程(单个计算机环境下的并发编程)也就没有任何现实意义了。不过,我们并不打算对它们进行详细的说明。因为这会非常耗费篇幅,并且也偏离了我们的主题。但是,我们确实应该关注随之而来的一些问题及其解决方案。请读者接着往下看。

6.2.2 关于同步

内核对进程的合理切换和调度使得多个进程可以被有条不紊地并发运行。在很多时候,多个进程之间需要相互配合并合作完成一个任务。这就需要有进程间通讯机制(IPC)的支持。不过在详细讲解各种IPC方法之前,我们先来了解一下进程之间在通讯过程中可能发生的干扰。这种干扰主要集中在有共享数据的情况下。不论是多CPU、多进程还是我们之后要提到的多线程,只要它们之前存在数据共享,就一定会牵扯到同步问题。不管被共享的数据是被存储在内存中、磁盘上,还是其他被共用的数据介质上,都会是这样。所以,我们接下来要讨论的这个问题是具有普遍意义的。其中的一些概念和论点可以适用于很多场景。

首先,我们考虑一个看似简单的应用场景——计数器。这个计数器由进程A创建并与进程B共享。进程A和进程B实际上执行了相同的程序。这个程序的任务是把符合某些条件的数据从数据库迁移到磁盘上。程序总是按照固定顺序从数据库中查询数据,并使用计数器记录的已被查询的数据的最大行号作为依据。下面是程序的具体任务步骤。

  1. 读取计数器的值。

  2. 从数据库中查询数据。如果我们用c来代表计数器的值,那么查询的范围就是行号在[c, c+100000)范围内的数据。也就是说,每次查询10万条数据。

  3. 遍历并筛选出符合条件的数据,并组成新的数据集合。

  4. 将新数据集合存储到指定目录的文件中。该文件的名称总是有一致的主名称data并会以递增的序号为后缀。例如,data1、data2,等等。

  5. 把计数器的值加100000。也就是说,计数器的新值就是下次要查询的数据的首行行号。

  6. 检查数据是否已被全部读完。如果是则直接退出,否则跳转回1。

进程A和进程B会被并发地运行。它们会各自循环往复地迁移它们认为的下一个数据集合,直到数据被全部迁移完毕。

这会出现问题吗?答案是肯定的。我们已经知道,每个进程在每次对指定数据集合的迁移过程中都需要完成上述6个步骤。由于内核会对各个进程切换和调度,因此不能保证进程在迁移每个数据集合的过程中都不被打断。也就是说,进程A和进程B的运行是互相穿插在一起的。这种穿插或者说切换的粒度会比我们上面罗列的步骤的粒度还要小很多。不过,为了清晰,我们假设进程切换的粒度与以上步骤的粒度相同。下面,我们在这个假设的基础上叙述一种可能的进程调度过程。

  1. 内核使CPU运行进程A。

  2. 进程A读取计数器的值1,并依此查询并筛选了数据,得到了新的数据集合。

  3. 内核认为进程A已经运行了足够长的时间,所以它把进程A换下并让CPU开始运行进程B。

  4. 进程B读取计数器的值1,并依此查询并筛选了数据,得到了新的数据集合。注意,这个数据集合与进程A刚刚得到的那个数据集合完全一样。

  5. 进程B把得到的数据集合写入名称为data1的文件,并在写入完成后关闭文件。

  6. 内核把进程B换下并让CPU开始运行进程A。

  7. 进程A把得到的数据集合写入名称为data1的文件,并在写入完成后关闭文件。

  8. 进程A把计数器的值更新为100001

  9. 内核把进程A换下并让CPU开始运行进程B。

  10. 进程B把计数器的值更新为100001

上述进程调度过程如图6-5所示。

{%}

图 6-5 初始流程下的进程调度过程

好了,到这里我们已经看出了一个很明显的问题,即进程A和进程B在做重复的事。更大的问题是,它们造成了双倍的资源消耗,并导致了事倍功半的结果。这是由于同一个进程对计数器的值的读取和更新之间的时间跨度太大了,以至于计数器只起到了任务进度记录的作用,而没有起到在两个进程之间协调的作用。既然这是由于时间跨度大的两个操作引起的,那么我们就把这个时间跨度缩小到最小,看看会不会解决此问题。我们把前面所说的程序的具体任务步骤中的第5步上移至第2步。也就是说,我们让一个进程在读取计数器的值之后马上更新它。这样,之后CPU运行的另一个进程就会去查询并处理后面的数据行了。

但是,如此真的能够彻底的解决上述问题吗?非常遗憾,答案是不能。请想象一下这样的进程调度过程,如果内核在进程A已经读取却还未更新计数器的值的时候让CPU转而运行进程B,会发生什么?请看图6-6。进程B在得到计数器的值1之后把该值更新为了100001。但是注意,它仍然会去做进程A即将要去做的事(查询行号在[c, c+100000)范围内的数据、筛选并保存到文件data1)。当进程A重新获得运行时机的时候也依旧会从查询行号在[c, c+100000)范围内的数据开始。为了突出重点,我们省略掉了所有的“写入并关闭文件”的步骤。

{%}

图 6-6 改进流程下的进程调度过程

显然,从第二个进程调度过程来看,我们对流程的更改并没有起到什么作用。上述问题仍会出现。内核是无法理解程序中各个语句的语义的,因此也就无法保证总是会在合适的时机切换进程。但是,新的流程确实要比之前的流程好得多。因为它大大减小了上述问题出现的概率。

做过此类工作的编程人员可能会提出一个新的解决方案,那就是在更新计数器的值的那一刻之前再去读取一下计数器的值,以保证更新不会重复。但是我们很难实现这种重新进行操作的方法,因为我们无法判断在进程获取计数器值和更新计数器值期间该进程是否被切换过。并且,即使这种方法可以被实现,它也不会解决现有问题。更糟糕的是,它会使事情变得更加复杂。一个可能的进程调度过程如图6-7所示。

{%}

图 6-7 更糟糕的流程下的进程调度过程

从图6-7中可以看出,重复的计数器值获取操作导致了行号在[100001, 200000]范围内的数据没有被处理。这个问题比重复处理数据的那个问题更为严重。

那到底怎样才是正确的解决方法呢?在揭晓答案之前,让我们先来熟悉一下相关的概念。前述的3个版本的程序流程都存在不同程度的问题。其中,第一个版本的程序流程的问题最为明显。它不但会导致计数器值的重复设置以及数据的重复处理,可能还会使文件中的数据造成错乱。想象一下,如果两个进程同时(或者说交错地)把大量数据写入到同一个文件中,会造成怎样的后果。

当几个进程同时对同一个资源进行访问的时候,就很可能会造成互相的干扰。这种互相干扰通常被称为竞态条件(race condition)。竞态条件通常在编码和测试过程中难以被察觉到。我们在前面列举的可能的进程调度过程都属于特例。它们的发生可能不会那么频繁。但是,一旦发生就绝对会造成程序运行结果的错误。更为重要的是,排查这种错误是比较困难的。正因为它们的发生并不频繁,所以场景重现变得非常不容易。这需要满足一系列特定的条件才有可能做到。找到并消除一个竞态条件可能会让程序运维人员耗上几个小时甚至几天的时间,尤其是在对底层的运行机制不了解的情况下。

相比于其他现代编程语言,Go语言的并发编程模型更加成熟和先进。它的目标就在于大幅减少编程人员产生竞态条件的可能。它尽可能多地把复杂的并发处理逻辑埋藏在它的运行时系统之下,让编程人员能够腾出精力和时间去解决真正的业务问题。关于Go语言的并发编程模型的详细介绍我们放在了本章的最后。

现在回到本小节的主题。造成竞态条件的根本原因在于进程在进行某些操作的时候被中断了。虽然进程在被再次运行的时候其状态会恢复如初,但是外界环境很可能已经在这极短的时间内被改变甚至面目全非了,就像我们改进后的第二个版本的程序流程那样。从访问计数器的方面看,它几乎已经能够成为有效的解决方案了。但就是由于应用程序对进程调度的不可控性使得竞态条件仍然可能发生。反过来说,如果能够保证计数器的值的获取并更新是一个原子操作的话,那么竞态条件就不会发生。更具体地,如果进程A在获取并更新计数器的值的过程中不被中断,那么进程B就会如我们所愿地去处理行号在[100001, 200000]范围内的数据了。

在这里,我们把执行过程中不能被中断的操作称为原子操作(atomic operation),而把只能被串行化的访问或执行的某个资源或某段代码称为临界区(critical section)。在第二个版本的程序流程中,每个进程对计数器的获取和更新操作都应该被看作是一个单一的、不应该被中断的操作。因此,它们应该组成一个原子操作。并且,这两个操作应该被串行化的执行,即在一个进程对计数器的值的获取并更新操作还未完成之前其他进程不得介入。因此,体现计数器的获取操作和更新操作的代码应该共同形成为一个临界区。顺便提一句,所有的系统调用都属于原子操作。我们不用担心它们的执行会被中断。

我们可以看出,原子操作和临界区这两个概念看起来有些相似。但是它们有一个明显的不同。原子操作是不能被中断的,临界区对是否可以被中断却没有强制的规定。只要保证一个访问者在临界区中的时候其他访问者不会被放进来就可以了。这也意味着它们的强度是不同的。

原子操作必须由一个单一的汇编指令代表,并且需要得到芯片级别的支持。当今的CPU中都提供了对原子操作的支持。即使是在多核CPU或多CPU的计算机系统中,原子操作也是可以被保证的。这使得原子操作能够做到绝对的并发安全,并且比其他同步机制要快很多。不过,读者可能会考虑这样一个问题:如果一个原子操作的执行总是无法结束而我们又无法中断它,那该怎么办?实际上,这也是内核只提供针对二进制位和整数的原子操作的原因。原子操作只适合细粒度的简单操作,就像前面讲述的对计数器的值的获取并更新操作那样。Go语言也在CPU和各个操作系统的底层支撑之上提供了对原子操作的支持。它们由标准库代码包sync/atomic中的一些函数代表。我们会在第8章详细说明它们。

相比之下,让要求被串行化执行的若干代码形成临界区的这种做法更加通用。保证只有一个进程或线程在临界区之内的这种做法有一个官方称谓——互斥(mutual exclusion,简称mutex)。实现互斥的方法必须确保排他原则(exclusion principle),并且这种保证不能依赖于任何计算机硬件(包括CPU)。也就是说,互斥方法必须有效且通用。时至今日,互斥方法的实现方式非常多样,有的只停留在理论层面,而有的已经成为了各个操作系统的标配。作为IPC方法之一的信号灯就属于后者。在Go语言的sync代码包中也包含了几个提供了互斥方法的类型。我们同样会在第8章讲解它们。

好了,我们对同步的介绍暂时就到这里。不过,在后面讲解多线程编程的时候,我们还会重返这一主题。

6.2.3 管道

管道(pipe)是一种半双工的(或者说单向的)通讯方式。它只能被用于父进程与子进程以及同祖先的子进程之间的通讯。例如,我们在使用shell命令的时候常常会用到管道:

  1. hc@ubt:~$ ps aux | grep go

shell为每个命令都创建一个进程,然后把左边的命令的标准输出用管道与右边的命令的标准输入连接起来。管道的优点在于它的简单,而缺点则是只能单向通讯以及对通讯双方关系上的严格限制。

对于管道,Go语言是支持的。通过标准库代码包os/exec中的API,我们可以执行操作系统命令并在此之上建立管道。我们可以像这样创建一个exec.Cmd类型的值:

  1. cmd0 := exec.Command("echo", "-n", "My first command from golang.")

变量cmd0的值与操作系统命令

  1. echo -n "My first command from golang."

是对应的。在exec.Cmd类型之上有一个名为Start的方法。我们可以使用这个方法启动一个操作系统命令,像这样:

  1. if err := cmd0.Start(); err != nil {
  2. fmt.Printf("Error: The command No.0 can not be startup: %s\n", err)
  3. return
  4. }

但是为了创建一个能够获取此命令的输出的管道,我们需要在上面这条if语句之前加入这样几条语句:

  1. stdout0, err := cmd0.StdoutPipe()
  2. if err != nil {
  3. fmt.Printf("Error: Can not obtain the stdout pipe for command No.0: %s\n", err)
  4. return
  5. }

变量cmd0的值的StdoutPipe方法会返回一个输出管道。在这里,我们把代表这个输出管道的值赋给了变量stdout0。它的类型是io.ReadCloser。这是一个接口类型并扩展了接口类型io.Reader。这样,在我们启动该命令之后就可以调用stdout0的值的Read方法来获取这个命令的输出了:

  1. output0 := make([]byte, 30)
  2. n, err := stdout0.Read(output0)
  3. if err != nil {
  4. fmt.Printf("Error: Can not read data from the pipe: %s\n", err)
  5. return
  6. }
  7. fmt.Printf("%s\n", output0[:n])

方法Read会把读出的输出数据存入调用方传递给它的字节切片(这里是output0的值)中并返回一个int类型值和一个error类型值。如果命令的输出小于output0的值的长度,那么变量n的值就代表了命令实际输出的字节的数量。否则,n的值就等于output0的值的长度。后一种情况常常意味着我们并没有完全读出输出管道中的数据。这时,我们通常需要再去读取一次或多次(可以使用for语句进行循环读取)。如果输出管道中再没有可以被读取的数据了,那么Read方法返回的第二个结果值就会是变量io.EOF的值。我们可以依此来判断数据是否已经被读完。像这样:

  1. var outputBuf0 bytes.Buffer
  2. for {
  3. tempOutput := make([]byte, 5)
  4. n, err := stdout0.Read(tempOutput)
  5. if err != nil {
  6. if err == io.EOF {
  7. break
  8. } else {
  9. fmt.Printf("Error: Can not read data from the pipe: %s\n", err)
  10. return
  11. }
  12. }
  13. if n > 0 {
  14. outputBuf0.Write(tempOutput[:n])
  15. }
  16. }
  17. fmt.Printf("%s\n", outputBuf0.String())

为了达到效果,我们故意将作为Read方法参数的字节切片的长度设置得很小。另外,为了收集每次迭代读到的输出内容,我们这些内容依次存放入到一个缓冲区中,并在最后将此缓冲区中的内容打印出来。

不过,一个更加方便的方法是,一开始就使用带缓冲的读取器(以下简称缓冲读取器)从输出管道中读取数据,像这样:

  1. outputBuf0 := bufio.NewReader(stdout0)
  2. output0, _, err := outputBuf0.ReadLine()
  3. if err != nil {
  4. fmt.Printf("Error: Can not read data from the pipe: %s\n", err)
  5. return
  6. }
  7. fmt.Printf("%s\n", string(output0))

由于stdout0的值也是io.Reader类型的,所以我们可以把它作为bufio.NewReader函数的参数。这个函数会返回一个bufio.Reader类型的值。它就是我们刚刚提到的缓冲读取器。在默认情况下,该读取器会携带一个长度为4096的缓冲区。缓冲区的长度代表了我们一次可以读取的字节的最大数量。由于cmd0代表的命令只会输出一行内容,所以我们可以直接用outputBuf0ReadLine方法来读取它。这个方法的第二个bool类型的结果值表明了当前行是否还未被读完。如果为false,我们依然可以利用for语句来读出剩余的数据。不过在这里我们并不需要这样做,所以我们把第二个结果赋给了空标识符“_”。另外,我们总是需要先检查err的值,看看是否有错误发生。如果没有任何错误,那么我们就可以放心地处理output0的值了。

使用带缓冲区的读取器的好处是我们可以非常方便和灵活地读取需要的内容,而不是只能先把所有内容都读出来再做处理。读者可以考虑一下,如果我们不使用缓冲读取器,那么从stdout0的值中读取一行内容的代码应该怎样编写。显然,这省去了我们自己的一些工作量。缓冲读取器提供的功能远比我们在这里展示得强大得多。请详见bufio代码包的文档。

好了,言归正传。我们已经知道,管道是一个单向数据通道。它可以把一个命令的输出作为另一个命令的输入。当然,我们也可以使用Go语言代码做到这一点。假设我们有如下两个exec.Cmd类型值:

  1. cmd1 := exec.Command("ps", "aux")
  2. cmd2 := exec.Command("grep", "apipe")

现在,我们在cmd1代表的命令之上建立一个输出管道,然后启动这个命令:

  1. stdout1, err := cmd1.StdoutPipe()
  2. if err != nil {
  3. fmt.Printf("Error: Can not obtain the stdout pipe for command: %s\n", err)
  4. return
  5. }
  6. if err := cmd1.Start(); err != nil {
  7. fmt.Printf("Error: The command can not be startup: %s\n", err)
  8. return
  9. }

这与我们操纵cmd0时的代码几乎相同。在这之后,我们通过StdinPipe方法在cmd2之上建立一个输入管道,并把与cmd1连接的输出管道中的数据全部写入到这个输入管道中:

  1. outputBuf1 := bufio.NewReader(stdout1)
  2. stdin2, err := cmd2.StdinPipe()
  3. if err != nil {
  4. fmt.Printf("Error: Can not obtain the stdin pipe for command: %s\n", err)
  5. return
  6. }
  7. outputBuf1.WriteTo(stdin2)

变量cmd2的值的StdinPipe方法会返回两个结果值。第一个结果值就是与该命令连接的输入管道。它是一个io.WriteCloser接口类型的值。这个接口类型扩展了io.Writer接口类型。正因为如此,它可以被作为缓冲读取器outputBuf1WriteTo方法的参数。这个方法会把所属值中缓存的数据全部写入到参数值代表的写入器中。这样就等于把第一个命令的输出内容通过管道传递给了第二个命令。

不过这还不算完。我们还需要启动cmd2并关闭与它连接的输入管道,以完成数据的传递。另外,为了获取到cmd2的输出结果,我们还需要附加两行代码。请看下面的示例:

  1. var outputBuf2 bytes.Buffer
  2. cmd2.Stdout = &outputBuf2
  3. if err := cmd2.Start(); err != nil {
  4. fmt.Printf("Error: The command can not be startup: %s\n", err)
  5. return
  6. }
  7. err = stdin2.Close()
  8. if err != nil {
  9. fmt.Printf("Error: Can not close the stdio pipe: %s\n", err)
  10. return
  11. }

我们初始化了一个缓冲区outputBuf2,并把它赋给了cmd2Stdout字段。这样,命令cmd2被启动后的所有输出内容就都会被写入到该缓冲区中。之后,我们启动了cmd2,并关闭了stdin2

为了获取到cmd2的所有输出内容,我们需要等到它运行结束后,再去查看缓冲区outputBuf2中的内容。因此,我们还需要调用cmd2Wait方法。像这样:

  1. if err := cmd2.Wait(); err != nil {
  2. fmt.Printf("Error: Can not wait for the command: %s\n", err)
  3. return
  4. }

方法Wait会一直阻塞到其所属的命令完全运行结束为止。这样,我们再去处理outputBuf2中的内容就完全没有问题了。

这个基于cmd1cmd2的示例模拟出了操作系统命令

  1. ps aux | grep apipe

的执行效果。不过,cmd2的输出会与直接运行这个操作系统命令得到的输出有所不同。因为该示例程序相当于在自身运行的过程当中又运行了上面的这个操作系统命令。

我把上面这些关于管道的示例代码都放到了goc2p项目的multiproc/apipe代码包中的命令源码文件中。读者可以使用go run命令运行其中的命令源码文件,并比较和分析刚才所说的在最终输出上的不同。

我们上面所讲的管道也被叫作匿名管道,与此相对的是命名管道(named pipe)。与匿名管道不同的是,任何进程都可以通过命名管道交换数据。实际上,命名管道以文件的形式存在于文件系统中。使用它的方法与使用文件很类似。Linux操作系统支持使用shell命令创建和使用命名管道。例如:

  1. hc@ubt:~$ mkfifo -m 644 myfifo1
  2. hc@ubt:~$ tee dst.log < myfifo1 &
  3. [1] 3485
  4. hc@ubt:~$ cat src.log > myfifo1

在上面的示例中,我们先使用命令mkfifo在当前目录创建了一个命名管道myfifo1,然后又使用这个命名管道和命令tee把src.log文件中的内容写到了dst.log文件中。为了简单,我们只是使用命名管道搬运了数据。实际上,在此基础上我们可以实现诸如数据的过滤或转换,以及管道的多路复用等功能。注意,命名管道默认是阻塞式的。更具体地说,只有在对这个命令管道的读操作和写操作都已准备就绪之后,数据才会开始流转。相对于匿名管道,命名管道最大的优势就是通讯双方可以毫不相关。并且,我们可以使用它建立非线性的连接以实现数据的多路复用。但要注意,命名管道仍然是单向的。又由于我们可以在命名管道之上实现多路复用,所以有时候也需要考虑多个进程同时向命名管道写数据的情况下的操作原子性问题。

在Go语言标准库代码包os中包含了可以创建这种独立管道的API。创建一个命名管道的代码非常简单,如下:

  1. reader, writer, err := os.Pipe()

函数Pipe会返回两个结果值。第一个结果值是代表了该管道输出端的os.File类型值,因第二个结果是代表了该管道输入端的os.File类型值。既然它们都是os.File类型的,那么我们就可以在它们之上调用os.File类型包含的所有方法。不过,os.Pipe方法返回的前两个结果值只是让我们用来传递数据的渠道而已。在底层,Go语言使用系统函数来创建管道,并把它的两端封装成两个*os.File类型的值。假设,有这样的两段代码:

  1. n, err := writer.Write(input)
  2. if err != nil {
  3. fmt.Printf("Error: Can not write data to the named pipe: %s\n", err)
  4. }
  5. fmt.Printf("Written %d byte(s). [file-based pipe]\n", n)

  1. output := make([]byte, 100)
  2. n, err := reader.Read(output)
  3. if err != nil {
  4. fmt.Printf("Error: Can not read data from the named pipe: %s\n", err)
  5. }
  6. fmt.Printf("Read %d byte(s). [file-based pipe]\n", n)

如果它们是被并发运行的,那么我们在reader之上调用Read方法就可以按顺序获取到之前通过调用writerWriter方法写入的数据。为什么强调是并发运行?因为命名管道默认会在其中一端还未就绪的时候阻塞另一端的进程。Go语言在这里提供给我们的命名管道的行为特征也是如此。所以,如果我们顺序地执行这两段代码,那么程序肯定会被永远阻塞在语句

  1. n, err := writer.Write(input)

  1. n, err := reader.Read(output)

出现的地方。具体被阻塞在哪儿取决于调用表达式writer.Write(input)reader.Read(output)哪一个先被求值。

另外,我们已经知道,管道都是单向的。因此,我们不能反过来使用readerwriter。也就是说,我们在reader之上调用Writer方法或者在writerRead方法之后获取的第二个结果值都将是一个非nilerror类型值。其中的信息会告诉我们,这样的访问是不被允许的。另外,不论我们在哪一方调用Close方法,都不会影响到另一方的读取或写入数据的操作。

实际上,我们在exec.Cmd类型值之上调用StdinPipeStdoutPipe方法后得到的输入管道或输出管道也是通过os.Pipe函数生成的。只不过,在这两个方法内部又对刚刚生成的管道做了少许的附加处理。输入管道的输出端会在所属命令启动后就被立即关闭,而输入端则会在所属命令运行结束之后被关闭。而输出管道的两端的自动关闭的时机与前面刚好相反。不过要注意,有些命令会等到输入管道被关闭之后才结束运行。所以,在这种情况下,我们就需要在数据被读取之后尽早地手动关闭输入管道。在前面的示例中,我们已经有过类似的演示:

  1. if err := cmd2.Start(); err != nil {
  2. // 省略若干条语句
  3. }
  4. err = stdin2.Close()
  5. // 省略若干条语句
  6. if err := cmd2.Wait(); err != nil {
  7. // 省略若干条语句
  8. }

请读者在必要时依照上面这样的操作顺序。由于输出管道实际上也是由os.Pipe函数生成的,所以我们在使用某个exec.Cmd类型值上的输出管道的时候也需要有所注意。例如,我们不能在读完输出管道中的全部数据之前调用该值的Wait方法。又例如,只要我们建立了对应的输出管道就不能使用Run方法来启动该命令,而应该使用Start方法。

由于通过os.Pipe函数生成的管道在底层是由系统级别的管道来支持的,所以我们在使用它的时候,要注意操作系统对管道的限制。例如,匿名管道会在管道缓冲区被写满之后使写数据的进程阻塞,以及我们已经在前面说过的命名管道会在其中一端未就绪前阻塞另一端的进程,等等。

我们在前面讲过,命名管道可以被多路复用。所以,当有多个输入端同时写入数据的时候我们就不得不需要考虑操作原子性的问题。操作系统提供的管道是不提供原子操作支持的。为此,Go语言在标准库代码包io中提供了一个被存于内存中的、有原子性操作保证的管道(以下简称内存管道)。我们生成它的方法与之前的很相似:

  1. reader, writer := io.Pipe()

函数io.Pipe返回两个结果值。第一个结果值是代表了该管道输出端的PipeReader类型值,第二个结果值是代表了该管道输入端的PipeWriter类型值。PipeReader类型和PipeWriter类型分别对管道的输出端和输入端做了很好的操作限制,即在PipeReader类型的值上我们只能使用Read方法从管道中读取数据,而在PipeWriter类型的值上我们则只能通过Write方法向管道写入数据。这样就有效避免了管道使用者对管道的反向使用。另一方面,我们在使用Close方法关闭管道的某一端之后,另一端在写数据或读数据的时候会得到一个预定义的error类型值。不过我们也可以通过调用CloseWithError来自定义另一端将会得到的error类型值。

另外,还需要注意,与os.Pipe函数生成的管道相同的是,我们仍然需要并发的运行被用来在内存管道的两端进行操作的代码。

在内存管道的内部是通过充分使用sync代码包中提供的API来从根本上保证操作的原子性的。所以,我们可以在它之上放心地并发写入和读取数据。另外,由于这种管道并不是基于文件系统的,并没有作为中介的缓冲区,所以通过它传递的数据只会被复制一次。这也就更进一步地提高了数据传递的效率。

我们上面所展示的关于命名管道以及内存管道的示例代码都被集中放置在了goc2p项目的multiproc/npipe代码包中,以供读者参考和取用。虽然它们的使用方法都非常简单,但是其中的一些运用技巧(尤其是命名管道)还是值得我们特别记忆的。

至此,我们介绍了系统级别的管道的概念和基本用法,以及Go语言标准库中与系统管道对应的若干API的使用方法和技巧。另外,我们还简单地说明了Go语言特别提供的一种基于内存的同步管道的创建和使用方法。

6.2.4 信号

操作系统信号(Signal,以下简称信号)是IPC中唯一一种异步的通讯方法。它的本质是用软件来模拟硬件的中断机制。信号被用来通知某个进程有某个事件发生了。例如,在命令行终端下按下某些快捷键就会挂起或停止正在运行的程序。又例如,我们通过kill命令杀死某个进程的操作也有信号的参与。

每一个信号都有一个以“SIG”为前缀的名字。例如,我们会在稍后看到的SIGINT、SIGQUIT以及SIGKILL,等等。但是,在操作系统内部,这些信号都由正整数代表。这些正整数被称为信号编号。在Linux操作系统的命令行终端下,我们可以使用kill命令来查看当前系统所支持的信号,如图6-8所示。

{%}

图 6-8 Linux操作系统支持的信号

可以看到,Linux操作系统支持的信号有62种(注意,没有编号为32和33的信号)。其中,编号从1到31的信号属于标准信号(也被称为不可靠信号),而编号从34到64的信号属于实时信号(也被称为可靠信号)。对于同一个进程来说,每种标准信号只会被记录并处理一次。并且,如果发送给某一个进程的标准信号的种类有多个,那么它们被处理的顺序也是完全不确定的。而实时信号解决了标准信号的这两个问题,即多个同种类的实时信号都可以被记录在案,并且它们可以按照信号的发送顺序被处理。虽然实时信号在功能上更为强大,但是已成为事实标准的标准信号也无法被替换掉。因此,这两大类信号一直共存着。

为了贴紧主题,我们下面仅仅会涉及使用Go语言开发信号处理程序所必需的知识。关于信号的完整概念和知识请读者参阅有关的文档和图书。

简单来说,信号的来源有键盘输入(比如按下快捷键Ctrl-c)、硬件故障,系统函数调用和软件中的非法运算。进程响应信号的方式有3种:忽略、捕捉和执行默认操作。

Linux操作系统对每一个标准信号都有默认的操作方式。针对不同种类的标准信号,其默认的操作方式一定会是以下操作中的一个:终止进程、忽略该信号、终止进程并保存内存信息、停止进程、若进程已停止就恢复。

对于绝大多数标准信号而言,我们可以自定义当进程接收到它们之后应该进行怎样的处理。这种自定义信号响应的唯一方法是,进程要告知操作系统内核:当某种信号到来时,需要执行某某操作。在程序中,这些作为信号响应的自定义操作往往是由函数来代表的。

go命令会对其中的一些以键盘输入为来源的标信号作出响应。这是由于go命令使用了在标准库代码包os/signal中的被用于处理信号的API。更具体地讲,go命令指定了需要被处理的信号并用一种很优雅的方式(使用到了通道类型的变量)来监听信号的到来。

下面我们从os.Signal接口类型开始讲起。该类型的声明如下:

  1. type Signal interface {
  2. String() string
  3. Signal() // to distinguish from other Stringers
  4. }

os.Signal接口类型的声明可知,其中的Signal方法的声明并没有实际意义。它只是作为os.Signal接口类型的一个标识。因此,在Go语言标准库中,此接口类型的所有实现类型的Signal方法都是空方法(方法体中没有任何语句)。

所有此接口类型的实现类型的值都应该可以代表一个操作系统信号。理所当然,其中每一个操作系统信号都是需要由操作系统支持的。换句话说,它们都是依赖于操作系统的。

在Go语言的标准库中,已经包含了与不同操作系统的信号相对应的程序实体。在标准库代码包syscall中,已经为不同的操作系统所支持的每一个标准信号都声明了一个相应的同名常量(以下简称信号常量)。这些信号常量的类型都是syscall.Signal的。syscall.Signalos.Signal接口类型的一个实现类型,同时也是一个int类型的别名类型。这就意味着,每一个信号常量都隐含着一个整数值。而信号常量的整数值与它所代表的信号在所属操作系统中的编号是一致的。

另外,如果我们查看syscall.Signal类型的String方法的源代码,还会发现一个包级私有的、名为signals的变量。在这个数组类型的变量中,每个索引值都代表了一个标准信号的编号,而对应的元素则是针对该信号的一个简短的描述。这些描述会分别出现在那些信号常量的字符串表示形式中。

好了,在了解了这些基础之后,我们就可以尝试使用os/signal代码包中的API来接受和处理操作系统的信号了。

代码包os/signal中的Notify函数用来把操作系统发给当前进程的指定信号通知给该函数的调用方。我们先来看看该函数的声明:

  1. func Notify(c chan<- os.Signal, sig ...os.Signal)

函数signal.Notify的第一个参数是通道类型的。虽然我们还没有正式讲通道类型,但是在这里还是有必要简单解释一下这个参数。这个参数的类型是chan<- os.Signal。这表示,在该参数中只能传递os.Signal类型的值(以下简称信号值)。并且,在函数signal.Notify中,只能向该通道类型值放入信号值,而不能从该值中取出信号值。这一约束是由在关键字chan右边的接收操作符<-代表的。signal.Notify函数会把当前进程接收到的指定信号放入参数c代表的通道类型值(以下简称signal接收通道)中。这样,调用方代码就可以从这个signal接收通道中按顺序的获取到操作系统发来的信号并进行相应的处理了。

函数signal.Notify的第二个参数是一个可变长的参数。这意味着我们在调用signal.Notify函数的时候,可以在第一个参数值之后再附加任意个os.Signal类型的参数值。参数sig代表的参数值应该包含我们希望自行处理的所有信号。在接收到我们希望自行处理的信号之后,os/signal包中的程序(以下简称signal处理程序)会把它封装成syscall.Signal类型的值并放入到signal接收通道中。当然,我们也可以只为第一个参数绑定实际值。在这种情况下,signal处理程序会把我们的意图理解为想要自行处理所有信号,并把接收到的几乎所有的信号都逐一进行封装并放入到signal接收通道中。下面我们来看一个例子(假设当前操作系统是Linux):

  1. sigRecv := make(chan os.Signal, 1)
  2. sigs := []os.Signal{syscall.SIGINT, syscall.SIGQUIT}
  3. signal.Notify(sigRecv, sigs...)
  4. for sig := range sigRecv {
  5. fmt.Printf("Received a signal: %s\n", sig)
  6. }

在这个示例中,我们先创建了调用signal.Notify函数所需的两个参数的值。变量sigRecv的值就是signal接收通道。我们用内建函数make创建了它。它的元素类型是os.Signal且长度是1。我们希望自行处理SIGINT信号和SIGQUIT信号。所以,变量sigs代表的[]os.Signal类型的切片值包含了syscall.SIGINTsyscall.SIGQUIT两个元素。在我们调用signal.Notify函数之后,立即试图用for语句从signal接收通道中获取信号值。只要sigRecv的值中存在元素值,for语句就会把它们按顺序地接收并赋给迭代变量sig。否则,for语句就会被阻塞,并等待新的元素值被发送到sigRecv的值中。顺便提一句,在sigRecv代表的通道类型值被关闭之后,for语句会立即被退出执行,所以我们不用担心程序会一直在这里循环往复。不过,我们在实际使用for语句迭代通道类型值的时候,不应该如此简单地处理。关于这种用法的细节我们会在第7章介绍。

注意,signal处理程序在向signal接收通道发送值的时候,并不会因为通道已满而产生阻塞。因此,signal.Notify函数的调用方必须保证signal接收通道会有足够的空间缓存并传递接收到的信号。我们可以创建一个足够长的signal接收通道。但是,一个更好的方法是,只创建一个长度为1的signal接收通道,并且时刻准备从该通道中接收信号。我们在上面的示例中也是这么做的。

这个示例中的信号处理代码非常简单,即只是把从signal接收通道中接收的信号的字符串表现形式打印出来而已。在实际的场景中,这样做是比较危险的。因为我们忽略了当前进程本该处理的信号。为什么这么说呢?我们在前面说过,应该把想自行处理的信号追加在传递给signal.Notify函数的第一个参数值的后面。那么,如果当前进程接收到了我们不想自行处理的信号会怎样做呢?答案是,执行由操作系统指定的默认操作。所以,如果我们指定了想要自行处理的信号但又没有在接收到信号时执行必要的处理动作,就相当于使当前进程忽略了这些信号。以SIGINT信号为例。SIGINT信号即中断信号,一般被用来停止一个已经失去控制的程序。如果我们在运行一个Go语言程序的过程中按下快捷键Ctrl-c,那么此程序的运行会被停止。然而,如果在被运行的这个Go语言程序中含有上面示例中的那段代码的话,无论我们按下多少次Ctrl-c都不能让它停下来,而仅仅会使标准输出上多出几行信息。试想一下,如果上面那段代码被修改为这样(注意第二行代码)会怎样:

  1. sigRecv := make(chan os.Signal, 1)
  2. signal.Notify(sigRecv)
  3. for sig := range sigRecv {
  4. fmt.Printf("Received a signal: %s\n", sig)
  5. }

如果被运行的Go语言程序中包含了这段代码,那么发给该进程的所有信号几乎都会被忽略掉。这样做而导致的后果可能是很悲剧的。

不过,幸好在类Unix操作系统下有两种信号既不能被自行处理也不会被忽略,它们是:SIGKILL和SIGSTOP。对它们的响应只能是执行系统默认操作。这种策略的最根本的原因是:它们向系统的超级用户提供了使进程终止或停止的可靠方法。系统不允许任何程序消除或改变与这两个信号所对应的处理动作。即使我们在程序中这样调用signal.Notify函数:

  1. signal.Notify(sigRecv, syscall.SIGKILL, syscall.SIGSTOP)

也不会改变当前进程对SIGKILL信号和SIGSTOP信号的处理动作。这种保障,不论对于应用程序还是操作系统来说,都是非常有必要的。

对于其他信号,我们除了能够自行处理它们之外,还可以在之后的任意时刻恢复针对它们的系统默认操作。这需要使用到os/signal包中的Stop函数。它的声明如下:

  1. func Stop(c chan<- os.Signal)

在函数signal.Stop的声明中只有一个参数声明。并且,这个参数声明与signal.Notify函数的第一个参数声明完全一致。这并不是巧合,而是有意为之。

函数signal.Stop会取消掉在之前调用signal.Notify函数的时候告知signal处理程序需要自行处理若干信号的行为。只有我们把当初传递给signal.Notify函数的那个signal接收通道作为调用signal.Stop函数时的参数值,才能如愿以偿地取消掉之前的行为,否则调用signal.Stop函数不会起到任何作用。在对signal.Stop函数的调用完成之后,作为其参数的signal接收通道将不会再被发送任何信号。这里存在一个副作用,即在之前示例中的那条被用于从signal接收通道接收信号值的for语句将会被一直阻塞。为了消除这种副作用,我们可以在调用signal.Stop函数之后使用内建函数close关闭该signal接收通道,就像下面这样:

  1. signal.Stop(sigRecv)
  2. close(sigRecv)

signal接收通道sigRecv被关闭之后,被用于从它那里接收信号值的for语句就会被退出执行。

在很多时候,我们可能并不想完全取消掉自行处理信号的行为,而只是想取消对一部分信号的自行处理。为了达到这个目的,我们只需再次signal.Notify函数,并重新设定与其参数sig绑定的、以os.Signal为元素类型的切片类型值(以下简称信号集合),只要作为第一个参数的signal接收通道相同就可以。在我们对signal.Notify函数的调用完成后,signal处理程序会发送给signal接收通道的信号的种类也会发生相应的改变。这完全取决于我们传递给signal.Notify函数的os.Signal类型值都有哪些。

有些读者可能会有疑问:如果signal接收通道不同又会怎样?答案是这样的:如果我们先后调用了两次signal.Notify函数,但是两次传递给该函数的signal接收通道不同,那么signal处理程序会视为这两次调用毫不相干。它会分别看待这两次调用时所设定的信号的集合。

我们把前面比较散碎的示例整理成关于信号的第一个完整示例,并把这个示例存放在一个单独的函数(以下简称示例函数)中。由于这个完整示例中的信息量比较大,所以我们下面分阶段来展示和讲解。

第一个阶段的代码如下:

  1. sigRecv1 := make(chan os.Signal, 1)
  2. sigs1 := []os.Signal{syscall.SIGINT, syscall.SIGQUIT}
  3. fmt.Printf("Set notification for %s... [sigRecv1]\n", sigs1)
  4. signal.Notify(sigRecv1, sigs1...)
  5. sigRecv2 := make(chan os.Signal, 1)
  6. sigs2 := []os.Signal{syscall.SIGQUIT}
  7. fmt.Printf("Set notification for %s... [sigRecv2]\n", sigs2)
  8. signal.Notify(sigRecv2, sigs2...)

我们先后调用了两次signal.Notify函数,并且每次传递给它的signal接收通道并不相同。为了清晰起见,我们也同样初始化了两个信号集合。第一次调用时设定的信号集合中包含了SIGINT信号和SIGQUIT信号,而第二次调用时的信号集合中只有SIGQUIT信号。如此一来,如果当前进程接收到的是SIGQUIT信号,那么signal处理程序会把它封装之后先后发送给signal接收通道sigRecv1sigRecv2。而如果接收到的是SIGINT信号,那么signal处理程序只会把封装好之后的信号发送给signal接收通道sigRecv1。也就是说,signal处理程序是分别处理不同的signal接收通道以及相应的信号集合的。

第二个阶段,我们要分别用两条for语句从signal接收通道sigRecv1sigRecv2中接收信号值。由于这两条for语句都会被阻塞,所以我们不得不让它们并发执行。这需要用到我们还没有正式讲过的go语句。go语句与defer语句的组成很类似,即包含关键字、单条语句或函数以及调用符号。与go语句有关的知识我们在下一章再细说。在这里,我们只需要知道它会被并发的执行其中的单条语句或函数就可以了。此外,我们需要示例函数在这两段被并发执行的程序都执行完毕之后再退出执行。因此,这里还用到了标准库代码包sync中的类型WaitGroup。请看这个阶段的代码:

  1. var wg sync.WaitGroup
  2. wg.Add(2)
  3. go func() {
  4. for sig := range sigRecv1 {
  5. fmt.Printf("Received a signal from sigRecv1: %s\n", sig)
  6. }
  7. fmt.Printf("End. [sigRecv1]\n")
  8. wg.Done()
  9. }()
  10. go func() {
  11. for sig := range sigRecv2 {
  12. fmt.Printf("Received a signal from sigRecv2: %s\n", sig)
  13. }
  14. fmt.Printf("End. [sigRecv2]\n")
  15. wg.Done()
  16. }()

简单来说,我们会先调用sync.WaitGroup类型值wgAdd方法添加一个值为2的差量。然后,在每段并发程序的最后再调用wgDone方法。这个方法的作用可以被视为使差量减1。在该示例函数的最后,我们再调用这个值的Wait方法。该方法会被一直阻塞直到差量变为0。这就相当于实现了我们刚刚描述的那个功能。

到了第三个阶段,我们不想从signal接收通道sigRecv1接收并自行处理信号了。换句话说,我们不再需要让signal处理程序向signal接收通道sigRecv1中发送信号了。不过,我们并不想马上这样做而是要先等待两秒钟。这使得我们能够有时间测试被删减之前的信号自行处理流程。这一功能可以通过标准库代码包timeSleep函数来实现。第三阶段的代码如下:

  1. fmt.Println("Wait for 2 seconds... ")
  2. time.Sleep(2 * time.Second)
  3. fmt.Printf("Stop notification... ")
  4. signal.Stop(sigRecv1)
  5. close(sigRecv1)
  6. fmt.Printf("done. [sigRecv1]\n")

最后一个阶段只包含一条语句:

  1. wg.Wait()

我们刚刚讲过这条语句的作用。这会避免该示例函数被提前退出执行。那样的话,我们就无法完整地演示信号自行处理的全过程了。

我把这个示例函数命名为sigHandleDemo并放到goc2p项目的multiproc/signal代码包的命令源码文件signal.go中。然后,我在该文件的main函数中添加了针对该示例函数的调用语句。最后,打开一个命令行终端,进入到multiproc/signal代码包所在的目录,使用go run命令运行命令源码文件signal.go。图6-9展现了这一演示的效果。

{%}

图 6-9 第一个信号示例的演示效果

注意,图中的这行内容

  1. Stop notification... done. [sigRecv1]

是在第三个阶段的代码被执行后被打印出来的。这意味着,程序完成了取消掉与signal接收通道sigRecv1对应的信号自行处理行为以及关闭signal接收通道sigRecv1的操作。在这之前,我们无论从键盘上键入Ctrl-c(会导致向当前进程发送SIGINT信号)还是Ctrl-\(会导致向当前进程发送SIGQUIT信号),都只会由我们自己提供的代码进行处理。并且,我们按下Ctrl-\之后会有两行内容被打印出来,即

  1. ^\Received a signal from sigRecv1: quit
  2. Received a signal from sigRecv2: quit

这意味着,signal接收通道sigRecv1sigRecv2都被发送了与SIGQUIT信号对应的信号值,并且与之相应的for语句也从中接收到了该信号值。但是,当sigRecv1被关闭之后,由这行内容

  1. End. [sigRecv1]

可以看出,相应的for立即被退出执行。在这之后,我们再次按下Ctrl-\只会使sigRecv2被发送信号值。而我们再按下Ctrl-c则导致当前进程直接被停止。这是由于与sigRecv2对应的信号集合中并没有SIGINT信号。

我们纵观os/signal代码包中的这两个函数的行为特征就能够看出,它们都是以signal接收通道为唯一标识来对相应的信号集合进行处理的。在signal处理程序的内部,存在一个包级私有的字典(以下称为信号集合字典)。该信号集合字典被用于存放以signal接收通道为键、以信号集合的变体为元素的键值对。当我们调用signal.Notify函数的时候,signal处理程序就会在信号集合字典中查找相应的键值对。如果键值对不存在,就向信息集合字典添加这个新的键值对,否则就更新该键值对中的信息集合的变体。前者相当于向signal处理程序注册一个信号接收的申请,而后者则相当于更新该申请。signal接收通道作为函数调用方接收信号的唯一途径,也理所应当地成为了这些申请的标识。也许读者已经猜到,当我们调用signal.Stop函数的时候,signal处理程序会删除掉信息集合字典中以该函数的参数值(某个signal接收通道)为键的键值对。

当接收到一个发送给当前进程且已被标识为应用程序想要自行处理的操作系统信号之后,signal处理程序会对它进行封装,然后遍历信息集合字典中的所有键值对,并查看它们的元素中是否包含了该信号。如果该信号被包含,那么就会立即把它发送给作为键的signal接收通道。这也进一步地解释了当我们多次调用signal.Notify函数且以不同的signal接收通道作为其参数值的时候所发生的事情。

总之,signal接收通道在Go语言提供的操作系统信号通知机制中起到了举足轻重的作用。我们能否合理地处理操作系统信号,也基本在于signal接收通道的初始化和使用的方式。

好了,我们现在已经对如何开始和停止自行处理接收到的信号有了足够多的了解。不过,Go语言程序能做的可不止这些。我们还可以编写向一个进程发送信号的程序。这需要用到标准库代码包os中的一些API。更具体地说,这主要依靠结构体类型os.Process和相关的函数和方法。

首先,我们可以使用os.StartProcess函数启动一个进程,或者使用os.FindProcess函数查找一个进程。这两个函数都会返回一个*os.Process类型的值(以下简称进程值)和一个error类型值。然后,我们可以调用该进程值的Signal方法来向该进程发送一个信号。进程值的Signal方法接受一个os.Signal类型的参数值并会返回一个error类型值。

我们现在就以第一个信号示例为依托,来演示怎样编写向进程发送信号的Go语言程序。我把这些程序存放到了mysignal.go文件中的sigSendingDemo函数中。

为了演示第二个示例,我们不能直接使用go run命令运行mysignal.go文件。正确的做法是,先使用go build命令编译该文件,然后再执行刚刚被生成在当前目录的可执行文件mysignal。至于为什么要这么做,我们一会儿再解释。

执行mysignal文件会使操作系统生成一个进程。在这里,我们称这个进程为演示进程。好了,我们就以演示进程作为信号发送的目标。在第二个示例中,我们打算完成这样几个操作。

  1. 执行一系列操作系统命令并获得演示进程的进程ID。当然,前提是演示进程已经被生成。

  2. 根据演示进程的ID初始化一个进程值。

  3. 使用该进程值之上的API向对应的进程发送一个SIGINT信号。

  4. 在标准输出上打印出演示进程已接收到信号的凭证。

还记得吗?我们在第一个示例中已经实现了第4个操作。注意,这两个与信号有关的示例是在同一个命令源码文件中的。换句话说,发送信号的程序和自行处理信号的程序都会被包含在与演示进程对应的程序中。因此,第二个示例中的代码要做的就是给自己的进程发送一个SIGINT信号。而第一个示例中的代码会在收到该信号之后向标准输出打印一行内容。当我们在命令行终端下执行mysignal文件之后,这两个示例中打印的所有内容都会出现在当前的标准输出上。

好了,第二个示例的功能需求我们已经了解了。现在我们开始编写代码。首先要做的是获取当前进程的进程ID。这完全可以由Linux操作系统命令(或者说shell命令)来实现,但需要用到多个命令和匿名管道。这一系列命令最终被确定为:

  1. ps aux | grep "mysignal" | grep -v "grep" | awk '{print $2}'

这其中用到了ps命令、grep命令和awk语言。这行命令由4个独立的shell命令组成。它们之间由匿名管道连接。这行命令的含义就是找到依据示例程序而生成并启动的进程的信息,然后找到信息中的进程ID。读者可以自己实验一下这行命令。当然,找到这个进程ID的前提是已经在其他命令行终端下或开发环境中生成并执行了mysignal文件。

我们依照此行shell命令创建一个*exec.Cmd类型值(以下简称命令值)的切片值,像这样:

  1. cmds := []*exec.Cmd{
  2. exec.Command("ps", "aux"),
  3. exec.Command("grep", "mysignal.go"),
  4. exec.Command("grep", "-v", "grep"),
  5. exec.Command("awk", "{print $2}"),
  6. }

由于我们在上一小节已经介绍了很多关于创建和使用命令值的知识,所以读者应该能看明白上面的这几行代码。

为了按顺序地执行前面的那行shell命令的并得到演示进程的进程ID,我们需要使用上一小节所讲到的相关知识来编写一些代码。这些代码被封装在了一个名为runCmds的函数中。该函数的声明如下:

  1. func runCmds(cmds []*exec.Cmd) ([]string, error)

这个函数接受一个代表了命令值列表的切片值作为参数,并返回一个代表了进程ID列表的[]string类型值和一个error类型值。它的函数体中的代码作者就不在此展示了。这会是一个很好的练习题。读者可以借此再复习一下怎样用Go语言程序实现串联命令的管道。

我们调用runCmds函数获得进程ID列表,像这样:

  1. output, err := runCmds(cmds)
  2. if err != nil {
  3. fmt.Printf("Command Execution Error: %s\n", err)
  4. return
  5. }

由于os.FindProcess只接受一个int类型的参数值,所以我们还需要把output变量的值中的string类型值都转换为int类型值。这种转换非常容易,因为我们仅仅使用标准库代码包strconv中的Atoi函数就可以做到。

假设我们已经完成了上述转换并把结果赋给了[]int类型的变量pids。然后,我们使用for语句在pids之上进行迭代,并把每次迭代出来的值都赋给迭代变量pid。对于每一个pid的值,我们都可以使用代码

  1. proc, err := os.FindProcess(pid)

得到进程值。在这里,我们把获取到的进程值赋给了变量proc。然后通过调用它的Signal方法给该值对应的进程发送信号,像这样:

  1. err = proc.Signal(syscall.SIGINT)

顺便说一句,如果在本示例中向演示进程发送的是SIGKILL信号,那么我们调用进程值的Kill方法也可以达到相同的目的。

我们现在来解答为什么非要生成可执行文件mysignal,而不是直接使用go run命令来运行命令源码文件mysignal.go的问题。go run命令程序中会执行一系列动作,为最后的Go语言程序的运行做准备。粗略地讲,这包括了依赖查找、编译、打包、链接这几个步骤。当这些步骤完成之后,会有一个与被运行的命令源码文件的主文件名同名的可执行文件被生成在相应的临时工作目录中。对于命令源码文件mysignal.go来说,该可执行文件的名称就是mysignal。实际上,这与使用go build命令生成的可执行文件是一致的。在最后,命令程序会执行可执行文件mysignal。但是,需要注意,为了执行mysignal而产生的进程是一个全新的进程。它与代表了go run mysignal.go命令的那个进程毫不相干。也就是说,这两个进程是相互独立的。它们都拥有自己的进程ID。

因此,在这种情况下,我们使用前面的那行shell命令会把这两个进程的ID都输出出来。又由于进程信息列表的排列顺序问题,与go run mysignal.go命令对应的那个进程的信息往往会出现在前面。所以,我们以顺序遍历进程ID列表的话,第二个示例中发送的信号会先到达与这个命令程序对应的进程。请注意,我们使用go run命令运行mysignal.go,命令程序会生成并执行可执行文件mysignal,然后该可执行文件所产生的输出会通过该命令程序打印到标准输出上。也就是说,在该命令程序被挂起、停止或终止之后,mysignal中的程序所打印的内容也再不会出现在标准输出上了。因此,演示进程在接收到SIGINT信号之后的打印内容自然就不会被展示出来了。

为了消除这种影响,我们才有了前面的那个需要先编译后再执行可执行文件的要求。不过,这个要求限制了我们演示示例的方式。这种限制可能会让人生厌。实际上,一个更好的方法是使用shell命令

  1. grep -v "go run"

来过滤掉原先进程信息列表中的与go run命令对应的进程。这样,被用于查找演示进程的shell命令就变成了这样:

  1. ps aux | grep "mysignal" | grep -v "grep" | grep -v "go run" | awk '{print $2}'

而对应的命令值的切片值的声明也会变为:

  1. cmds := []*exec.Cmd{
  2. exec.Command("ps", "aux"),
  3. exec.Command("grep", "mysignal"),
  4. exec.Command("grep", "-v", "grep"),
  5. exec.Command("grep", "-v", "go run"),
  6. exec.Command("awk", "{print $2}"),
  7. }

好了,我们用这条声明cmds的语句替换掉原来的语句就可以了。

至此,sigSendingDemo函数中的代码我们也已经编写完毕了。现在我们要修改一下mysignal.go文件中的main函数,以使得sigHandleDemo函数和sigSendingDemo函数可以被并发的执行。这样,我们就可以看到比较好的演示效果了。我们依然运用go语句和time.Sleep函数来达到这一目的。main函数的完整声明如下:

  1. func main() {
  2. go func() {
  3. time.Sleep(5 * time.Second)
  4. sigSendingDemo()
  5. }()
  6. sigHandleDemo()
  7. }

main函数中,我们并发的执行了sigSendingDemo函数。不过在执行它之前先等待了5秒钟以确保sigHandleDemo函数中的流程已经在

  1. wg.Wait()

语句处阻塞。最后,我们来运行这个包含了信号自行处理功能和信号发送功能的完整示例。其演示效果如图6-10所示。

{%}

图 6-10 完整的信号示例的演示效果

请看在该命令行终端中最后出现的那两行内容。倒数第二行内容是第二个示例中的代码打印出来的。这表示即将要向演示进程发送SIGINT信号。倒数第一行内容是第一个示例中的代码打印出来的,表示演示进程已经接收到SIGINT信号并自行处理了。

看到下面的那个正在高亮的光标了吗?这表明演示进程并未结束执行。signal接收通道sigRecv2也没有被关闭,被用于从中接收信号值的for语句还一直处于阻塞状态。

经过本小节的一番讲解,相信读者已经基本掌握了使用Go语言自行处理和发送操作系统信号的方法。我们可以通过这些非常方便和灵活地使用操作系统信号。例如,我们可以在进程被终止前释放所持的系统资源和持久化一些重要数据。又例如,我们可以在当前进程中有效的控制其他相关进程的状态。

信号与管道都被称为基础的IPC方法。由于当今的主流操作系统对它们都有所支持,因此Go语言作为一种跨平台的计算机编程语言,自然也就把操纵它们的方法囊括在了标准库中。Go语言为我们提供了关于它们的更高层次的抽象方法和API。这使得我们可以在任何可以安装了Go语言的操作系统下用相同的种方式使用这些系统级别的功能。这也是Go语言为我们带来的便利之一。不过,需要注意的是,在基于数据传递的解决方案中,保证数据的原子性是非常重要的。然而,管道并不提供这种原子性保证。即使是Go语言标准库中提供的相关API也没有附加这种保证。

6.2.5 Socket

Socket,常被译为套接字。它也是一种IPC方法。但是与我们之前讲述的那几种IPC方法不同的是,它是通过网络连接来使两个或更多的进程建立通讯并相互传递数据的。这使得进行通讯的双方是否在同一台计算机上变得无关紧要。实际上,这是Socket的目标之一——使通讯端的位置透明化。

注意,本小节的内容会涉及一些TCP/IP协议栈的知识。但由于篇幅原因,我们并没有在这里的展开它们。读者若需要进一步了解它们,请参阅有关的文档和教程。

1. Socket的基本特性

在当今的大多数操作系统中都包含了Socket接口的实现。在主流以及更多的编程语言中也都有自己的基于Socket的API。当然,Go语言也不例外。

我们从操作系统提供的Socket接口开始讲起。在Linux操作系统中,存在一个名为socket的系统调用。其声明如下:

  1. int socket(int domain, int type, int protocol);

该系统调用的功能是创建一个Socket实例。它接收3个参数。这3个参数分别代表了这个Socket的通讯域、类型和所用协议。

每个Socket都必将存在于一个通讯域当中。Socket的通讯域决定了该Socket的地址格式和通讯范围,参见表6-1。

表6-1 Socket的通讯域

通讯域含义地址形式通讯范围
AF_INETIPv4域IPv4地址(4个字节),端口号(2个字节)在基于IPv4协议的网络中的任意两台计算机之上的两个应用程序
AF_INET6IPv6域IPv6地址(16个字节),端口号(2个字节)在基于IPv6协议的网络中的任意两台计算机之上的两个应用程序
AF_UNIXUnix域路径名称在同一台计算机上的两个应用程序

由上表可知,Linux操作系统提供的Socket的通讯域有3个,即AFINET、AF_INET6和AF_UNIX。它们分别代表了IPv4域、IPv6域和Unix域。这3个域的标识符都以“AF”为前缀。“AF”是address family的缩写,意为地址族。这也暗示了每个域的Socket地址格式的不同。另外,我们还可以了解到,IPv4域和IPv6域的通讯是在网络范围内的,而Unix域的通讯则是在单台计算机范围内的。

Socket的类型有很多,包括SOCK_STREAM、SOCK_DGRAM、面向更底层的SOCK_RAW,以及针对某个新兴数据传输技术的SOCK_SEQPACKET。这些Socket类型的相关特性如表6-2所示。

表6-2 Socket类型的特性

特性Socket类型
SOCK_DGRAMSOCK_RAWSOCK_SEQPACKETSOCK_STREAM
数据形式数据报数据报字节流字节流
数据边界没有
逻辑连接没有没有
数据有序性不能保证不能保证能够保证能够保证
传输可靠性不具备不具备具备具备

表6-2呈现了不同Socket类型的5个特性。

数据形式有两种:数据报和字节流。以数据报为数据形式意味着数据接收方的Socket接口程序可以意识到数据的边界并会对它们进行切分。这样就省去了接收方的应用程序寻找数据边界和切分数据的工作量。以字节流为数据形式的数据传输实际上传输的是一个字节接着一个字节的串。我们可以把它想象成一个很长的字节数组。一般情况下,字节流并不能体现出其中的哪些字节属于哪个数据包。因此,Socket接口程序是无法从中分离出独立的数据包的。这一工作只能由应用程序去完成。然而,SOCK_SEQPACKET类型的Socket的接口程序却截然不同。数据发送方的Socket接口程序可以忠实地记录数据边界。这里的数据边界就是应用程序每次发送的字节流片段之间的分界点。这些数据边界信息会随着字节流一同被发往数据接收方。数据接收方的Socket接口程序会根据数据边界把字节流切分成(或者说还原成)若干个字节流片段并按照需要依次传递给应用程序。

面向有连接的Socket之间在进行数据传输之前必须要先建立逻辑连接。在连接被建立好之后,通讯双方可以很方便地互相传输数据。并且,由于连接已经暗含了双方的地址,所以在传输数据的时候不必再指定目标地址。从另一个角度看,两个面向有链接的Socket之间一旦建立连接,那么它们发送的数据就只能被发送到连接的另一端。然而,面向无连接的Socket则完全不同。这类Socket在进行通讯时无需建立连接。它们传输的每一个数据包都是独立的,并且会直接被发送到网络上。在这些数据包中都含有目标地址,因此每个数据包都可能被传输至不同的目的地。此外,在面向无连接的Socket之上的数据流只能是单向的。也就是说,我们不能使用同一个面向无连接的Socket实例既发送数据又接收数据。

数据传输的有序性和可靠性与Socket是否面向连接有很大的关系。正因为逻辑连接的存在,通讯双方才有条件通过一些手段(比如基于TCP协议的序列号和确认应答,等等)来保证从数据发送方发送的数据能够及时、正确、有序地到达数据接收方,并被接收方接受。

最后要注意,SOCK_RAW类型的Socket提供了一个可以直接通过底层(TCP/IP协议栈中的网络互联层)传送数据的方法。为了保证安全性,应用程序必须具有操作系统的超级用户的权限才能够使用这种方式。并且,该方法的使用成本也相对较高,因为应用程序一般需要自己构建数据传输格式(像TCP/IP协议栈中的TCP协议的数据段格式和UDP协议的数据报格式那样)。因此,应用程序一般极少使用这种类型的Socket。

我们在调用系统调用socket的时候,一般会把0作为它的第三个参数值。其含义是让操作系统内核根据第一个参数和第二个参数的值自行决定Socket所使用的协议。这也意味着Socket的通讯域和类型与所用协议之间是存在对应关系的。这来通过表6-3来了解一下这种对应关系。

表6-3 Socket所用协议的默认选择

决定因素SOCK_DGRAMSOCK_RAWSOCK_SEQPACKETSOCK_STREAM
AF_INETUDPIPv4SCTPTCP或SCTP
AF_INET6UDPIPv6SCTPTCP或SCTP
AF_UNIX有效无效有效有效

在表6-3中,TCP(Transmission Control Protocol,中文译作传输控制协议)、UDP(User Datagram Protocol,中文译作用户数据报协议)和SCTP(Stream Control Transmission Protocol,中文译作流控制传输协议)都是TCP/IP协议栈中的传输层协议,而IPv4和IPv6则分别代表了TCP/IP协议栈中的网络互连层协议IP(Internet Protocol,中文译作网际协议)的第4个版本和第6个版本。“有效”表示该通讯域和类型的组合会使内核选择某个内部的Socket协议。“无效”则表示该通讯域和类型的组合是不合法的。在Go语言提供的Socket编程API中也会涉及这些组合,并有一些专用的字符串字面量来表示它们。

现在我们来看系统调用socket的返回值。在没有发生任何错误的情况下,系统调用socket会返回一个int类型的值。该值是作为socket唯一标识符的文件描述符。在得到该标识符之后,我们就可以调用其他系统调用来进行各种相关操作了,比如,绑定和监听端口、发送和接收数据以及关闭Socket实例,等等。不过,由于篇幅原因,我们就不在这里介绍那些系统调用的用法了。

注意,我们一直在说通过系统调用来使用操作系统提供的Socket接口。这就意味着,Socket接口程序与TCP/IP协议栈的实现程序一样,是Linux操作系统内核的一部分。

2. 基于TCP/IP协议栈的Socket通讯

我们已经知道,Socket接口既可以提供网络中的不同计算机上的多个应用程序间的通讯支持,也可以成为单台计算机上的多个应用程序间通讯的手段。虽然如此,但是我们使用Socket接口的绝大多数情况都是为了在网络中的进行通讯。这样的通讯是基于TCP/IP协议栈的。

图6-11表明了Socket接口与TCP/IP协议栈以及操作系统内核的关系。

{%}

图 6-11 Socket接口与TCP/IP协议栈

通过基于TCP/IP协议栈的Socket接口,我们不但可以建立和监听TCP连接和UDP连接,甚至还可以直接与网络互联层的IP协议实现程序进行通讯。不过,我们并不打算详细描述后者。因为,绝大多数应用程序需要的仅仅是与传输层的程序打交道。

在本小节中,我们会利用Go语言提供的Socket编程API来编写一个较完整的示例,以试图让读者学会使用它们。这个示例包含了两个在概念上独立的程序,即服务端程序和客户端程序。服务端程序会在一个给定的端口上监听TCP连接,而客户端程序则会试图与这个服务端程序建立TCP连接并进行通讯。为了让读者对基于TCP/IP协议栈的Socket通讯有一个宏观上的认识,我们绘制了一张流程图,如图6-12所示。它展现了TCP服务端和TCP客户端通过操作系统的Socket接口建立TCP连接并进行通讯的一般情形。其中不但涉及了我们在前面提到过的系统调用socket,还包含了一些我们并没有讲到的系统调用。

图6-12所示的只是一个极其简单的通讯流程。服务端程序在创建Socket实例、绑定本地地址、监听地址之后开始等待连接的接入。在之后的某个时刻,客户端程序也创建了一个Socket实例并试图与服务端程序建立TCP连接。服务端程序接收到客户端程序发出的TCP连接请求并随即与它建立连接。客户端程序向服务端程序发送了请求数据,服务端程序在接收到并处理了该请求之后也向客户端程序发送了响应数据。客户端程序接收到了它想要的响应数据,并关闭了TCP连接。这时,客户端程序所在的操作系统的内核会通知服务端程序。服务端程序在接到通知之后会立即关闭相对应的TCP连接。在实际的应用场景中,通讯双方会进行多次数据交互。也就是说,图6-12中在圆角框之内的子流程一般会循环很多次。

{%}

图 6-12 基于TCP/IP协议栈的Socket通讯的一个简单流程

为了使用Go语言程序实现上面所说的服务端程序和客户端程序,我们主要会使用到标准库代码包net中的API。首先,我们会用到这个函数:

  1. func Listen(net, laddr string) (Listener, error)

函数net.Listen被用于获取一个监听器。它接受两个string类型的参数。第一个参数的含义是以何种协议来在给定的地址上监听。我们在稍前的内容中已经介绍过Socket可能使用的协议。在Go语言中,这些协议由一些字符串字面量来表示,如表6-4所示。

表6-4 代表Socket协议的字符串字面量

字面量Socket协议备注
"tcp"TCP
"tcp4"TCP网络互联层协议仅支持IPv4
"tcp6"TCP网络互联层协议仅支持IPv6
"udp"UDP
"udp4"UDP网络互联层协议仅支持IPv4
"udp6"UDP网络互联层协议仅支持IPv6
"unix"有效可看作是在通讯域为AF_UNIX且类型为SOCK_STREAM的时候内核采用的默认协议
"unixgram"有效可看作是在通讯域为AF_UNIX且类型为SOCK_DGRAM的时候内核采用的默认协议
"unixpacket"有效可看作是在通讯域为AF_UNIX且类型为SOCK_SEQPACKET的时候内核采用的默认协议

函数net.Listen的第一个参数的值所代表的必须是面向流的协议。TCP和SCTP都属于面向流的传输层协议。但不同的是,TCP协议实现程序无法记录和意识到任何消息边界,也无法从字节流分离出消息,而SCTP协议实现程序却可以做到这些的。后者使得应用程序无需再在发送的字节流的中间加入额外的消息分隔符,也无需再去查找所谓的消息分隔符并据此对字节流进行切分。保存消息边界的这种做法有利有弊,因此TCP协议和SCTP协议也各自适用于不同的场景。不过,二者皆适用的情况也是存在的。

解释一下,消息是数据包在TCP/IP协议栈的应用层中的称谓。消息边界与我们前面所说的数据边界的含义基本相同。这两者的不同之处在于,消息边界仅仅针对消息,而数据边界针对的对象的范围更广。还要注意,数据段是TCP协议实现程序为了使数据流满足网络传输的要求而做的分段,与这里所说的被用于区分独立消息的消息边界毫不相关。

综上所述,net.Listen函数的第一个参数的值必须是tcptcp4tcp6unixunixpacket中的一个。它们代表的都是面向流的协议。其中,tcp4tcp6分别仅与基于IPv4协议的TCP协议和基于IPv6协议的TCP协议相对应,而tcp则表示Socket所用的TCP协议会(或者说应该)兼容这两个版本的IP协议。另外,unixunixpacket分别代表了两个通讯域为Unix域的内部的Socket协议。遵循它们的Socket实例仅被用于在本地计算机上的不同应用程序之间的通讯。

对于基于TCP协议的Socket来说,net.Listen函数的第二个参数laddr的值代表了当前程序在网络中的标识。laddr是Local Address的简写形式。它的格式是“host:port”。其中,“host”代表IP地址或主机名,而“port”则代表当前程序欲监听的端口号。例如,127.0.0.1:8085。注意,在“host”处的内容必须是与当前计算机对应的IP地址或主机名,否则在调用该函数的时候会造成一个错误。另外,如果在“host”处的是主机名,那么该API中的程序(以下简称API程序)会先通过DNS(Domain Name System,中文译作域名系统)找到与该主机名对应的IP地址。因此,若“host”处的主机名没有在DNS中注册,那么也同样会造成一个错误。

好了,现在我们可以迈出构建一个基于TCP协议的服务端程序的第一个步了,像这样:

  1. listener, err := net.Listen("tcp", "127.0.0.1:8085")

函数net.Listen返回两个结果值。第一个结果值是net.Listener类型的。它就是我们欲获取的监听器。第二个结果值是一个error类型值。它代表可能出现的错误。当然,和往常一样,我们需要先判断变量err是否为nil。若判断结果为真,则说明以给定的协议在给定的地址上的监听无法开始。这时,我们往往应该先去检查传递给net.Listen函数的两个参数值的合法性。否则,我们就可以开始等待客户端的连接请求了,代码如下:

  1. conn, err := listener.Accept()

当我们调用net.Listener类型值的Accept方法的时候,流程会被阻塞,直到某台计算机上的某个应用程序与当前程序建立了一个TCP连接。此时,Accept方法会返回两个结果值。第一个结果值是代表了当前TCP连接的net.Conn类型值,而第二个结果值依然是一个error类型值。我们依旧要先对第二个结果值进行检查。

为了让这个不为人知的服务端程序具有意义,我们在继续编写服务端程序之前先来了解一下怎样才能与一个服务端程序建立TCP连接,并实现一个客户端程序。

代码包net中的Dial函数可被用于向网络中的某个地址发送数据,它的声明如下:

  1. func Dial(network, address string) (Conn, error)

函数net.Dial也接受两个参数。其中,networknet.Listen函数的第一个参数net含义非常类似。它比后者拥有更多的可选值。因为,发送数据之前不一定要先建立连接。像UDP协议和IP协议就都是面向无连接型的协议。因此,udpudp4udp6ip, ip4ip6都可以作为参数network的值。其中,udp4udp6分别代表了仅基于IPv4协议的UDP协议和仅基于IPv6协议的UDP协议,而udp所代表的UDP协议则在IP协议的版本上没有任何限制。另外,unixgram也是network参数的可选值之一。与unixunixpacket相同,unixgram也代表了一个基于Unix域的内部Socket协议。但不同的是,后者是以数据报作为传输形式的。

函数net.Dial的第二个参数address的含义与net.Listen函数的第二个参数laddr完全一致。如果我们想与前面刚刚开始监听的服务端程序连接的话,那么这个参数的值就应该是该服务端的地址,即为127.0.0.1:8085。因此,这个参数的名称address其实也可由raddr(Remote Address)代替。名称laddrraddr都是相对的。前者指的是当前程序所使用的地址(本地地址),而后者则指的是参与通讯的另一端所使用的地址(远程地址)。我们会在net代码包的函数或方法声明中经常见到这两个参数名称。

有的读者可能会问:客户端自己的地址在哪里给出呢?答案是根本不用给出。端口号可以由应用程序指定,也可以由操作系统内核动态分配。就使用net.Dial建立Socket连接的客户端程序而言,它占用的端口号是由操作系统内核动态分配的。另一方面,客户端程序的地址中的“host”一定是本地计算机的主机名或IP地址,这也会由操作系统内核为我们指定。当然,我们也可以自己去指定当前程序的地址,不过这就需要使用另外的函数建立连接了。我们后面再探讨这个问题。

调用net.Dial函数的代码类似于:

  1. conn, err := net.Dial("tcp", "127.0.0.1:8085")

函数net.Dial返回两个结果值。一个是net.Conn类型值,另一个是error类型值。同样的,若参数值不合法,则第二个结果值会不为nil。此外,对基于TCP协议的连接请求来说,当在远程地址之上并没有程序正在监听的时候,也会使net.Dial函数返回一个非nilerror类型值。

我们都知道,网络中是存在延时现象的。因此,在收到另一方的有效回应(无论连接成功或失败)之前,发送连接请求的一方往往会等待一段时间,在上面的示例中则表现为流程在调用net.Dial函数的那行代码上一直阻塞。在超过这个等待时间之后,函数的执行就会结束并返回相应的error类型值。因此,这类等待时间也常被称为超时(timeout)时间。不同操作系统对基于不同协议的连接请求的超时时间都有不同的设定。例如,在Linux操作系统内核中,把基于TCP协议的连接请求的超时时间设定为75秒。与其他超时时间相比,这已经算是很短了。在很多应用场景中,固定不变的超时时间往往无法满足需求。我们总是希望掌控能够掌控的一切,在编写代码时也不例外。因此,操作系统内核为我们提供了改变这类超时时间的接口。同时,在Go语言的net代码包中也存在相应的API。对于net.Dial函数来说,可同时设定超时时间的函数为net.DialTimeout。它的声明如下:

  1. func DialTimeout(network, address string, timeout time.Duration) (Conn, error)

该函数与net.Dial函数的唯一区别就是可以同时对连接请求的超时时间进行设定。我们可以看到,net.DialTimeout函数声明中的最后一个参数是被专门用于设定超时时间的。它的类型是time.Duration,单位是纳秒。但是,我们设定的超时时间一般会比纳秒级别高好几个数量级。不过不用担心,在标准库代码包time中,预先声明了与常用的时间单位相对应的time.Duration类型的常量。time.Duration类型是int64类型的一个别名类型。所以,不严谨地说,time.Duration相当于一个数值类型。在设定time.Duration类型的值的时候,我们可以直接使用它们来拼凑需要的时间,而不用再去计算诸如1小时48分73秒等于多少纳秒之类的问题。例如,常量time.Nanosecond代表1纳秒,它的值就是1。而常量time.Microsecond代表1微秒,其值为1000 * Nanosecond,也就是1000纳秒。以此类推。当我们想表示一个时间为2秒的time.Duration类型值时可以这样编写:

  1. 2 * time.Second

如果我们在请求TCP连接的同时想把超时时间设定为2秒,我们可以这样调用net.DialTimeout函数:

  1. conn, err = net.DialTimeout("tcp", "127.0.0.1:8085", 2*time.Second)

至此,我们讲述的API足以让我们在服务端程序和客户端程序之间建立起TCP连接。不过,看起来我们在这里并没有使用操作系统内核提供的API创建Socket实例。的确,这一操作已经被隐含在Go语言提供的Socket API程序中了。此外,在服务端,与本地地址绑定的操作也被隐含在net.Listen函数背后的程序中了。这种在API上的简化是很值得称赞的。虽然这隐藏了处于底层的Socket接口的相关细节,但好在我们通过前面内容已经对Socket接口有所了解了。

在前面,我们在通过调用net.Listen函数得到一个net.Listener类型值之后,又调用该值的Accept方法以等待客户端连接请求的到来。当收到客户端的连接请求之后,服务端会与客户端建立TCP连接(三次握手)。当然,这个连接的建立过程是两端的操作系统内核共同协调完成的。当成功建立连接后,我们会通过从Accept方法得到一个代表了该TCP连接的net.Conn类型值。这就是说,不论服务端程序还是客户端程序,当TCP连接建立完成之后都会得到一个net.Conn类型值。在这之后,通讯两端就可以分别利用各自获得的net.Conn类型值交换数据了。下面我们就来说说API程序在net.Conn类型之上提供的功能。

首先需要说明的是,Go语言的Socket编程API程序在底层获取的是一个非阻塞式的Socket实例。这就是说,我们使用Socket接口在一个TCP连接上的数据读取操作也都是非阻塞式的。在应用程序试图通过系统调用read从Socket的接收缓冲区中读取数据的时候,即使接收缓冲区中没有任何数据,操作系统内核也不会使系统调用read进入阻塞状态,而是直接返回一个错误码为“EAGAIN”的错误。但是,应用程序并不应该视此为一个真正的错误,而是应该忽略该错误然后稍等片刻之后再去尝试读取。另外,如果在读取数据的时候接收缓冲区有数据,那么系统调用read就会携带这些数据立即返回。即使当时的接收缓冲区中只包含了一个字节的数据也会是这样。这一特性被称为部分读(partial read)。另一方面,在应用程序试图向Socket的发送缓冲区中写入一段数据的时候,即使发送缓冲区已被填满系统调用write也不会被阻塞,而是直接返回一个错误码为“EAGAIN”的错误。同样地,应用程序应该忽略该错误并稍后再尝试写入数据。如果发送缓冲区中有少许剩余空间但不足以放入这段数据,那么系统调用write会尽可能地写入一部分数据然后返回已写入的字节的数据量。这一特性被称为部分写(partial write)。应用程序应该每次调用write之后都去检查该结果值,并在发现数据未被完全写入时继续写入剩下的数据。在非阻塞式的Socket接口之下,除了readwrite之外,系统调用accept也会显现出一致的非阻塞风格。它不会被阻塞以等待新连接的到来,而会直接返回错误码为“EAGAIN”的错误。有些读者可能会立即发问:前面说net.Listener类型值的Accept方法会在被调用时阻塞直至新连接的到来,它们与这里所说的非阻塞式的行为并不相符啊?!别急,请读者继续看接下来的说明。

Go语言的Socket编程API程序在一定程度上充当了前面所说的应用程序的角色。它为我们屏蔽了相关系统调用的“EAGAIN”错误。这使得有些Socket编程API调用起来像是阻塞式的。但是,我们应该明确,它在底层使用的是非阻塞式的Socket接口。另外,需要注意的是,Go语言的Socket编程API程序同样为我们屏蔽了非阻塞式Socket接口的部分写特性。相关API直到把所有数据全部写入到Socket的发送缓冲区之后才会返回,除非在写入的过程中发生了某种错误。但是,它却保留了非阻塞式Socket接口的部分读特性,并把它们呈现给了它的使用者(我们编写的应用程序)。这样做是合理的。因为,在TCP协议之上传输的数据是字节流形式的。数据接收方无法意识到数据的边界(也可以说消息边界)。所以,Socket编程API程序也就无从判断函数调用返回的时机。把数据切分和分批返回的任务交给调用方程序也算是最好的选择了。部分读是需要我们在程序中做一些额外的处理的。我们会在后面进一步说明这个问题。

好了,现在让我们重新关注net.Conn类型。它是一个接口类型。在它的方法集合中包含了8个方法。它们定义了我们可以在一个连接上做的所有事情。接下来,我们就逐一地对它们进行说明。

1. Read方法

方法Read被用来从Socket的接收缓冲区中读取数据。下面是该方法的声明:

  1. Read(b []byte) (n int, err error)

该方法接受一个[]byte类型的参数。该参数的值相当于一个被用来存放从连接上接收到的数据的“容器”。它的长度完全由应用程序来决定。Read方法会最多从连接中读取数量等于该参数值的长度的若干字节,并把它们依次放置到该参数值中的相应元素位置(索引值从0len(b)-1)上。该参数值中的相应位置上的原元素值将会被替换。不过,即使是这样,我们也应该让“容器”保持绝对地干净。换句话说,传递给Read方法的参数值应该是一个不包含任何非零值元素的切片值。在一般情况下,Read方法只有在把参数值填满之后才会返回。但是,在有些情况下,Read方法在未填满参数值之前就返回了。这可能是由相关的网络数据缓存机制导致的。我们在前面已经说明过这一问题。不管是什么原因,如果Read方法未填满参数值,而该参数值的靠后部分又存在遗留元素值的话,我们就需要特别小心。好在Read方法返回的第一个结果值可以帮助我们从中识别出真正的数据部分。结果n代表了本次操作实际读取到的字节的个数。我们也可以把它理解为Read方法向参数值中填充的字节的个数。我们可以这样来使用它:

  1. b := make([]byte, 10)
  2. n, err := conn.Read(b)
  3. content := string(b[:n])

我们通过对[]byte类型的结果n的切片来抽取出接收到的数据。即使n的值为0,这样做也不会有任何问题。但是,我们仍然需要通过检查作为结果之一的error类型值来判断函数的执行是否正常结束。读者应该已经非常熟悉这种做法了。不过,我们在这里对错误的检查会稍微复杂一些。

如果Socket编程API程序在从Socket的接收缓冲区中读取数据的时候发现TCP连接已经被另一端关闭了,那么就会立即返回一个error类型值。这个error类型值与io.EOF变量的值是相等的。我们在前面多次接触过io.EOF变量。它的值象征着文件内容的完结。相应地,该值在这里意味着在该TCP连接上再无可被读取的数据。也可以说,该TCP链接已经无用,可以被关闭了。因此,如果Read方法的第二个结果值与io.EOF变量的值相等,那么我们就应该中止后续的数据读取操作,并关闭该该TCP连接。请看下面的代码:

  1. var dataBuffer bytes.Buffer
  2. b := make([]byte, 10)
  3. for {
  4. n, err := conn.Read(b)
  5. if err != nil {
  6. if err == io.EOF {
  7. fmt.Println("The connection is closed.")
  8. conn.Close()
  9. } else {
  10. fmt.Printf("Read Error: %s\n", err)
  11. }
  12. break
  13. }
  14. dataBuffer.Write(b[:n])
  15. }

上面这几行代码较完整地展现了一个在代表TCP连接的net.Conn类型值之上读取数据的流程。首先,我们声明一个bytes.Buffer类型值,并以此来存储将会接收到的所有数据。通过不带任何子句的for语句,我们编写出了一个可以被无限循环执行的代码块。在这个代码块中,我们总是先在变量conn的值上调用Read方法以读取从网络上接收到的数据,并在确定未发生任何错误之后把数据追加到dataBuffer的值中。这可以解决我们前面提到的非阻塞式的Socket接口的部分读特性所带来的问题。另一方面,对于非nilerror类型值,我们还有第二层判断。如果它等于io.EOF变量的值,那么就说明当前连接已经被正常关闭,而不是有真正的错误发生。这时,我们可以打印提示信息,然后在本端也执行关闭连接的操作。否则,我们就应该打印出错误信息。无论第二层判断的结果如何,我们都会终止当前的for语句的执行。当然,在发生读取错误的时候,是否需要终止循环应该根据具体的应用场景来决定。我们在这里展示的是最简单的情况。另一个可能需要调整地方是,我们一般不会在连接被关闭之前无休止地从连接上读取数据。作为一个处在TCP/IP协议栈的应用层的程序,应该负责切分数据并生成有实际意义的消息。即使在最简单的情况下,应用层程序也应该知道怎样在接收到的字节流上进行切分。我们可以按照自己的要求去编写实现切分操作的程序。不过,还有一个更简便的方法。我们可以利用标准库代码包bufio中的API实现一些较复杂的数据切分操作。bufio是Buffered I/O的缩写。顾名思义,bufio代码包中的API提供了与带缓存的I/O操作有关的支持。比如,通过包装不带缓存的I/O类型值的方式增强它们的功能。我们在前面讲管道的时候已经介绍过bufio.NewReader函数的用法。它接收一个io.Reader类型的参数值。由于net.Conn类型实现了接口类型io.Reader中唯一的方法Read,所以它是该接口类型的一个实现类型。因此,我们可以使用bufio.NewReader函数来包装变量conn,像这样:

  1. reader := bufio.NewReader(conn)

在这之后,我们就可以通过调用reader变量的值之上的ReadBytes方法来依次获取经过切分之后数据了。ReadBytes方法接受一个byte类型的参数值。该参数值应该是通讯两端协商一致的那个消息边界。一个关于ReadBytes方法的用法示例如下:

  1. line, err := reader.ReadBytes('\n')

一般情况下,在每次调用ReadBytes方法之后,我们都会得到一段以该消息边界为结尾的数据。当然,在很多时候,消息边界的定位并不是查找一个单字节字符那么简单。比如,HTTP协议中规定,在HTTP消息的头部信息的末尾一定是连续的两个空行,即字符串"\r\n\r\n"。在获取到HTTP消息的头部信息之后,相关程序会通过其中的名为“Content-Length”的信息项的值得到HTTP消息的数据部分的长度。这样,一个HTTP消息就可以被切分出来了。为了满足这些较复杂的需求,bufio代码包为我们提供了一些更高级的API,例如bufio.NewScanner函数、bufio.Scanner类型及其方法,等等。

2. Write方法

方法Write被用来向Socket的发送缓冲区写入数据。下面是该方法的声明:

  1. Write(b []byte) (n int, err error)

该方法背后的API程序为我们屏蔽了很多非阻塞式Socket接口的细节。这使得我们可以简单地调用它而不用再做其他额外的处理,除了需要应对可能会发生的操作超时异常。

同样地,我们也可以使用代码包bufio中的API来使这里的写操作更加灵活。net.Conn类型的Write方法的声明与io.Writer接口类型中的唯一方法Write的声明完全一致。所以,net.Conn类型的值可以作为bufio.NewWriter函数的参数值,像这样:

  1. writer := bufio.NewWriter(conn)

与前面示例中的变量reader类似,writer的值可以被看作是针对变量conn代表的TCP连接的缓冲写入器。我们可以调用其上的以“Write”为名称前缀的方法分批次地向其中的缓冲区写入数据,也可以调用它的ReadFrom方法直接从其他io.Reader类型值中读出并写入数据,还可以通过调用Reset方法以达到重置和复用它的目的。在向其写入全部数据之后,我们应该调用它的Flush方法,以保证其中的所有数据都被真正地写入到了它代理的对象(在这里,这一对象就是由变量conn代表的TCP连接)中。此外,我们应该留心该缓冲写入器的缓冲区容量(默认是4096个字节)。因为,在我们调用以“Write”为名称前缀的方法的时候,如果作为参数值的数据的字节数量超出了此容量,那么该方法就会试图把这些数据的全部或一部分直接写入到它代理的对象中,而不会先在缓冲写入器自己的缓冲区中缓存这些数据。有时候,这并不是我们希望的。为解决此类问题,我们可以通过调用bufio.NewWriterSize函数来初始化一个缓冲写入器。该函数与bufio.NewWriter函数非常类似,但它让我们可以自定义将要生成的缓冲写入器的缓冲区容量。

3. Close方法

方法Close会关闭当前的连接。它不接受任何参数并返回一个error类型值。在调用该方法之后,对该连接值(由示例中的conn变量代表的值)上的Read方法、Write方法或Close方法的任何调用都会使它们立即返回一个error类型值。代表该error类型值的变量已经被预置在了net代码包中,其提示信息是:

  1. use of closed network connection

另外,如果我们在调用Close方法的时候,Read方法和/Write方法正在被应用程序调用且还未执行结束,那么它们也会立即结束执行并返回非nilerror类型值。即使它们正处于阻塞状态也会是这样。

4. LocalAddrRemoteAddr方法

单从名称上来看,我们就能猜到这两个方法的作用了。它们都不接受任何参数并返回一个net.Addr类型的结果。这个结果的值代表了参与当前通讯的某一端的应用程序在网络中的地址。显然,LocalAddr方法返回的是代表了本地地址的net.Addr类型值,而RemoteAddr方法返回的则是代表了远程地址的net.Addr类型值。net.Addr类型是一个接口类型。在它的方法集合中有两个方法——NetworkStringNetwork方法会返回当前连接所使用的协议的名称。例如,在我们所说的这个应用场景中,这条语句

  1. conn.LocalAddr().Network()

会使我们得到"tcp"这个string类型值。String方法返回相应的地址。这个地址与我们前面所说的各个通讯域下的地址的表现形式和格式是对应的。对于IPv4域来说,这个地址的格式就是“host:port”。我们前面讲到的那个基于TCP协议的服务端程序的地址就是"127.0.0.1:8085"。这与我们获取监听器时给定的那个地址是一致的。当一个客户端连接到来时,我们可以通过如下语句获取该连接的另一端的应用程序的网络地址:

  1. conn.RemoteAddr().String()

另一方面,对于客户端程序,如果我们在与服务端程序通讯的时候未指定本地地址,那么这条语句:

  1. conn.LocalAddr().String()

会让我们得到操作系统内核为该客户端程序分配的网络地址。

5. SetDeadlineSetReadDeadlineSetWriteDeadline方法

这3个方法都只接受一个time.Time类型值,并会返回一个error类型值。方法SetDeadline会设定在当前连接上的I/O(包括但不限于读和写)操作的超时时间。注意,这里的超时时间是一个绝对时间!也就是说,如果在SetDeadline方法调用语句之后的相关I/O操作在到达此超时时间的时候还没有完成,那么它们就会被立即结束执行并返回一个非nilerror类型值。这个error类型值由一个被预置在net代码包中的包级私有变量代表。它的Error方法的返回值是"i/o timeout"。注意,当我们以循环的方式不断尝试从一个连接上读取数据的时候,如果想要设定超时时间,那么就需要在每次迭代中的读取数据操作之前都设定一次。这正是因为我们设定的超时时间是一个绝对时间,并且它会对之后的每个I/O操作都起作用。如果在超时时间达来的时候循环语句仍在执行过程中的话,那么在后面的迭代中执行的I/O操作都会失败并返回代表超时的error类型值。请看下面的示例:

  1. b := make([]byte, 10)
  2. conn.SetDeadline(time.Now().Add(2 * time.Second))
  3. for {
  4. n, err := conn.Read(b)
  5. // 省略若干条语句
  6. }

我们通过调用time.Now函数获得代表了当前绝对时间的time.Time类型值,然后调用该值的Add方法在当前绝对时间之上加上了2秒的相对时间。这就意味着,我们把超时时间设定为2秒之后的那一时刻。假设,在之后的for语句块的第二次迭代完成的时候逝去的时间将近2秒,并for语句块第三次迭代开始的时候已经达到了超时时间。这时,在第三次迭代中的读操作会立即失败。并且,后面的迭代中的读操作也必定会相继失败。这样的流程设计显然是不正确的。如果我们把上面的代码改成这样:

  1. b := make([]byte, 10)
  2. for {
  3. conn.SetDeadline(time.Now().Add(2 * time.Second))
  4. n, err := conn.Read(b)
  5. // 省略若干条语句
  6. }

那么只要Read方法的执行能够在2秒内结束,就不会有超时错误出现。这是由于我们在每次迭代的读操作开始之前都先对超时时间进行了延伸。

如果我们不再需要设定超时时间了,那就应该及时取消掉它,以免干扰后续的I/O操作。这一操作可以通过调用同样的方法来实现。如果给予SetDeadline方法的参数值为time.Time类型的零值,超时时间就会被取消掉。由于time.Time是一个结构体类型,所以我们用time.Time{}来表示它的零值。因此,下面代码会取消掉之前对超时时间的设定:

  1. conn.SetDeadline(time.Time{})

读者肯定已经猜到了,SetReadDeadline方法和SetWriteDeadline方法的功能也是设定之后的I/O操作的超时时间。但不同的是,它们仅分别针对于读操作和写操作。这里说的读操作与连接值的Read方法的调用对应,而写操作则与连接值的Write方法的调用对应。对于写操作的超时,有一个问题需要明确。即使一个写操作(也就是对Write方法的调用)超时了,也不一定代表写操作完全没有成功。因为,在超时之前,Write方法背后的程序可能已经将一部分数据写到Socket的发送缓冲区了。也就是说,即使Write方法因操作超时而被迫结束执行并返回,它的第一个结果值也可能大于0。这时,该结果值就代表了在操作超时之前被真正写入的数据的字节数量。

另外,我们对SetDeadline方法的调用相当于先后以同样的参数值对SetReadDeadline方法和SetWriteDeadline方法进行调用。如果我们想统一设定所有相关的I/O操作的超时时间,那么使用SetDeadline方法肯定是便捷的。但当我们需要更细致的控制操作超时的时候,就需要用到后两个方法了。总之,这3个方法为我们提供了不同粒度的I/O操作超时时间控制方法。最后,要记住,它们仅针对在当前连接值之上的I/O操作。

好了,我们现在对net.Conn接口类型上的所有方法也都有所了解了。现在,我们通过一个较完整的示例把这些知识和用法贯穿起来。

该示例包含服务端程序和客户端程序。它们以网络和TCP协议作为通讯的基础。服务端程序的功能可以被概括为:接收客户端程序的请求、计算请求数据的立方根,并把对结果的描述返回给客户端程序。下面是对服务端程序的功能需求更详细的描述。

  • 需要根据事先约定好的数据边界把接收到的请求数据切分成数据块。

  • 仅接受可以由int32类型表示的请求数据数据块。对于不符合要求的数据块,要生成错误信息并返回给客户端程序。并且,发送给客户端程序的每块响应数据都应该带有约定好的数据边界。

  • 对于每个符合要求的数据块,需要依次计算它们的立方根、生成结果描述并返回给客户端程序。

  • 需要鉴别闲置的通讯连接并主动关闭它们。闲置连接的鉴别依据是:在过去的10秒钟内,没有任何数据经该连接被传送到服务端程序。这可以非常有效地减少相关资源的消耗。

客户端程序的功能相对简单一些,可以被概括为:向服务端程序发送若干个代表了int32类型值的请求数据,接收服务端程序返回的响应数据并记录它们,下面是一些细节。

  • 发送给服务端程序的每块请求数据都应该带有约定好的数据边界。

  • 需要根据事先约定好的数据边界把接收到的响应数据切分成数据块。

  • 在获得所有期望的响应数据之后,应该及时关闭连接以节省资源。

  • 需要严格限制耗时,从开始向服务端程序发送请求数据到接收到所有期望的响应数据,其耗时不应该超过5秒钟,否则应该在报告超时错误之后关闭连接。这实际上是对服务端程序的响应速度的检验。

除上述需求之外,我们还希望把服务端程序和客户端程序放置在同一个命令源码文件中。所以,在实现它们的时候,我们不得不使用一些Go语言提供的并发和同步的手段,以使得它们能够并发地运行和适时地结束。别担心,这些方法我们在前面都已经使用过。并且,在后面的两章中,我们也会详细地讲解它们。

首先,我们在goc2p项目中专门建立了一个命令源码文件。这个源码文件在该项目中的相对路径是src/multiproc/socket/tcpsock.go。由于通讯两端的程序都在这一个源码文件中,所以我们完全可以把服务端程序所用的网络协议和地址声明为常量并存放在该文件中。此外,对于作为数据边界的分界符也应该是统一的。注意,如果通讯两端的程序是完全分离的(通常如此),那么最好把这类共用信息存放到第三方的存储介质中并对相关程序提供可访问的接口。

根据上面的描述,我们首先声明了3个常量:

  1. const (
  2. SERVER_NETWORK = "tcp"
  3. SERVER_ADDRESS = "127.0.0.1:8085"
  4. DELIMITER = '\t'
  5. )

可以看出,为了简单起见,我们还是使用单字节字符作为数据边界。

我们为服务端程序和客户端程序各声明了一个入口函数,它们的名称是serverGoclientGo。它们都是不接受任何参数且没有任何结果值的函数。这样做比把它们的代码都堆在命令源码文件的main函数中要好得多。这也是我们遵循单一职责原则的一个表现。另外,独立的入口函数让我们可以任意地选择两端程序的执行方式(并发或串行)。

下面,我们来编写serverGo函数的函数体。首先要做的就是根据给定的网络协议和地址创建一个监听器,代码如下:

  1. var listener net.Listener
  2. listener, err := net.Listen(SERVER_NETWORK, SERVER_ADDRESS)
  3. if err != nil {
  4. printLog("Listen Error: %s\n", err)
  5. return
  6. }
  7. defer listener.Close()
  8. printLog("Got listener for the server. (local address: %s)\n", listener.Addr())

注意,我们在这段代码中加入了一条defer语句,并用它来保证在serverGo函数结束执行之前关闭监听器。这样我们就不用在每条返回语句之前都添加一条listener.Close()语句了。并且,我们也不用担心万一发生运行时恐慌的时候监听器不能被关闭。另外,读者可能已经注意到,在这段代码中有一个名为printLog函数。这个函数实际上使我们为了更好地记录日志而编写的一个辅助函数。它的调用方法与fmt.Printf函数完全一致。这样做是为了分离将来很可能发生变化的日志记录操作。当前,我们仅仅简单地把日志打印到标准输出。但如果我们日后想完善日志的记录格式或者更改日志的记录方式(比如把日志记录到文件或者数据库),那么仅仅改动printLog函数的函数体中的代码就可以了。把可能频繁变化和基本不变的代码分离开来是非常重要的代码编写及优化手段。它可以有效地避免在程序维护过程中的散弹式修改。printLog函数的声明如下:

  1. func printLog(format string, args ...interface{}) {
  2. fmt.Printf("%d: %s", logSn, fmt.Sprintf(format, args...))
  3. logSn++
  4. }

我们通过连用fmt代码包中的两个函数很方便地实现了在原有的日志记录项之上添加内容的功能。其中logSn是我们在当前源码文件中声明的一个包级私有的变量。它代表了每个日志记录项的序号。这纯属是为了我们在后面讲解的日志的时候能够方便一点。

在成功获得到监听器之后,我们就可以开始等待客户端的连接请求了。请看下面的代码:

  1. for {
  2. conn, err := listener.Accept() // 阻塞直至新连接到来
  3. if err != nil {
  4. printLog("Accept Error: %s\n", err)
  5. }
  6. printLog("Established a connection with a client application. (remote address: %s)\n",
  7. conn.RemoteAddr())
  8. go handleConn(conn)
  9. }

请注意for代码块中的最后一条语句。这条语句是一条go语句。它是我们所说的Go语言提供的并发手段之一。go handleConn(conn)语句意味着要启动一个新的Goroutine(或称Go程)来并发的执行handleConn函数。在服务端程序中,这通常是非常有必要的。为了快速、独立地处理已经建立的每一个连接,我们应该尽量让这些处理过程被并发地执行。否则,当我们处理已建立的第一个连接的时候,后续连接就只能排队等待,尽管它们可能已经达到很长时间了。这相当于完全串行地处理众多连接,这样做的效率是非常低下的。并且,只要对其中的某一个连接的处理因某些原因被阻塞了,后续的所有连接就都无法得到处理。这时,服务端程序就等于完全丧失了主要功能。这是非常糟糕的情况。如果阻塞状态永远不被改变,那么这种糟糕的状况也一直会延续下去。因此,对于服务端程序而言,采用并发的方式处理连接是必然的选择。

既然每一个连接都是由handleConn函数处理的,那么我们就来看看怎样编写它的实现。它的简单声明(不包含其函数体)如下:

  1. func handleConn(conn net.Conn)

它仅接受一个代表了连接的net.Conn类型值。由于我们可以把响应数据通过这个net.Conn类型值传递给客户端程序,所以handleConn函数无需再返回结果。另一个更客观的原因是,handleConn函数作为go语句的一部分,即使它返回了结果值也不会有任何意义。实际上,go语句中的函数向调用方传递结果值的方式是与众不同的。我们在后面的章节再说明这种独特的方式。

函数handleConn首先要做的肯定是试图从连接中读取数据。注意,这类读取操作应该处在循环之中。也就是说,服务器端程序应该不断的尝试从已建立的连接中读取数据。这样才能保证尽量及时地处理和响应请求。请看下面的这段代码:

  1. for {
  2. conn.SetReadDeadline(time.Now().Add(10 * time.Second))
  3. strReq, err := read(conn)
  4. if err != nil {
  5. if err == io.EOF {
  6. printLog("The connection is closed by another side. (Server)\n")
  7. } else {
  8. printLog("Read Error: %s (Server)\n", err)
  9. }
  10. break
  11. }
  12. printLog("Received request: %s (Server)\n", strReq)
  13. // 省略若干条语句
  14. }

上面的for代码块并不是handleConn函数的函数体中的全部。我们暂时只在这里展示它的一部分。for代码块中的第一条语句的作用是实现上面所说的关闭闲置连接的功能需求的一部分。其中的SetReadDeadline函数的调用方法我们应该已经很熟悉了。超时错误的发生就意味着当前连接已经可以被判定为闲置连接。这时,我们会记录日志并通过break语句退出当前的for语句块的执行。至于关闭连接的操作,我们在后面会看到。现在接着往下看,第二条语句中read函数也是我们编写的一个辅助函数。该函数的功能是从连接中读取一段以数据分界符为结尾的数据。它的完整声明如下:

  1. func read(conn net.Conn) (string, error) {
  2. readBytes := make([]byte, 1)
  3. var buffer bytes.Buffer
  4. for {
  5. _, err := conn.Read(readBytes)
  6. if err != nil {
  7. return "", err
  8. }
  9. readByte := readBytes[0]
  10. if readByte == DELIMITER {
  11. break
  12. }
  13. buffer.WriteByte(readByte)
  14. }
  15. return buffer.String(), nil
  16. }

我们把readBytes变量的值的长度初始化为1的原因是,防止从连接值中读出多余的数据从而对后续的读取操作造成影响。我们从连接上每读取出一个字节的数据都要检查它是否是数据分界符。如果不是,就继续读取下一个字节。如果是就停止读取并返回结果。这样就不会把当前字节流中的第一个数据分界符后面的数据提前读取出来。如果提前读取发生了,那么我们下一次调用read函数的时候就无法得到一个完整的数据块了。另外,为了暂存当前数据块中的字节,我们用到了一个bytes.Buffer类型值。这通常比使用一个[]byte类型值来存储一个不定长的字节流更加实用和高效。还记得吗?如果当前连接已经被关闭,那么连接值的Read方法在被调用之后会返回一个与io.EOF变量的值相等的错误值。因此,鉴于read函数中对该Read方法返回的错误值的处理方式,我们在调用read函数之后,也应该对其返回的错误值做一样的相等性判断。这一点已经在前面示例中的for代码块中展示出来了。

认真读过前面内容的读者可能会想到通过调用bufio.NewReader函数得到一个针对当前连接的缓冲读取器。如果能想到这一点真的很好。不过对于当前的场景来说,缓冲读取器是不适合的。为什么这么说呢?简单来说,这是由于我们把conn.Read封装在了read函数中。我们先看看使用一个使用了缓冲读取器read函数版本是什么样子的,代码如下:

  1. // 千万不要使用这个版本的read函数!
  2. func read(conn net.Conn) (string, error) {
  3. reader := bufio.NewReader(conn)
  4. readBytes, err := reader.ReadBytes(DELIMITER)
  5. if err != nil {
  6. return "", err
  7. }
  8. return string(readBytes[:len(readBytes)-1]), nil
  9. }

这很诱人。因为这个版本的read函数减少了一多半的代码。但是,这里面却埋藏了一个陷阱。这与缓冲读取器中的缓存机制有关。在很多时候,它会读取比足够多更多一点的数据到其中的缓冲区中。这就产生了我们前面提到的提前读取的问题。当然,如果我们每次都从同一个缓冲读取器中读取数据块的话,肯定是没有问题的。但是,在这里,我们对read函数的每一次调用都会导致一个新的针对当前连接的缓冲读取器被创建出来。我们实际上是在使用不同的缓冲读取器试图从同一个连接上读取的数据。这显然会造成一些问题,因为没有任何机制来协调它们的读取操作。本应留给后面的缓冲读取器读取的数据却被提前读取到了前面的缓冲读取器的缓冲区中。并且,由于我们不会再使用前面的这些缓冲读取器读取数据,所以这些被提前读取的数据实际上是被废弃了。这不但会导致一些数据块的不完整,甚至还可能会使一些数据块被漏掉。对于像本示例中的这种长度很短的小数据块而言更是如此。综上所述,我们决不能使用这个版本的read函数!不过,如果我们确实需要使用缓冲读取器也不是没有办法。方法很简单,即我们删除掉read函数,直接在for代码块之前初始化一个缓冲读取器,并且保证在for循环中总是使用同一个缓冲读取器来读取数据。这不但可以规避之前提到的所有问题,也可以避免多次创建缓冲读取器所带来的资源浪费。读者可以沿着这一思路尝试对现有的这个for代码块进行重构。

如果读者刚才去重构for语句块了,那么真的值得一赞。现在,我们接着看for语句块中的第二部分:

  1. for {
  2. // 省略若干条语句
  3. i32Req, err := convertToInt32(strReq)
  4. if err != nil {
  5. n, err := write(conn, err.Error())
  6. if err != nil {
  7. printLog("Write Error (written %d bytes): %s (Server)\n", err)
  8. }
  9. printLog("Sent response (written %d bytes): %s (Server)\n", n, err)
  10. continue
  11. }
  12. f64Resp := cbrt(i32Req)
  13. respMsg := fmt.Sprintf("The cube root of %d is %f.", i32Req, f64Resp)
  14. n, err := write(conn, respMsg)
  15. if err != nil {
  16. printLog("Write Error: %s (Server)\n", err)
  17. }
  18. printLog("Sent response (written %d bytes): %s (Server)\n", n, respMsg)
  19. }

这部分的代码实现的功能是检查数据块是否可以被转换为一个int32类型值,如果能被转换就立即计算它的立方根,否则就向客户端程序发送一条错误信息。其中,convertToInt32函数实现了尝试转换数据块的功能,而cbrt被用于计算立方根。它们的函数体中涉及了一些与Socket无关的代码包的使用。因此,我们在这里略过对它们的讲解。不过,我们有必要简要说明一下write函数。它的声明如下:

  1. func write(conn net.Conn, content string) (int, error) {
  2. var buffer bytes.Buffer
  3. buffer.WriteString(content)
  4. buffer.WriteByte(DELIMITER)
  5. return conn.Write(buffer.Bytes())
  6. }

有了编写read函数的经验,我们编写write函数会很轻松。同样是使用一个bytes.Buffer类型值暂存数据,不过这次存储的是将要发送出去的数据,而不是已经接收到的数据。bytes.Buffer类型针对不同形式的数据提供了不同的写入方法,这很方便。注意,我们在每次发送的数据的后面都要追加一个数据分界符。这样才能形成一个两端程序均可识别的数据块。另外,bytes.Buffer类型值的Bytes方法会把其中存储的所有数据以字节切片的形式返回给调用方。我们正好可以用这个字节切片作为conn.Write方法的参数值。由于write函数的结果声明列表与conn.Write方法的完全相同,所以在write函数的最后我们直接返回后者的结果就可以了。

回到for代码块中。在数据块转换出错的情况下,我们直接把错误信息发送给了客户端程序,并根据发送的结果记录了日志。之后,我们要做的就是读取下一个数据块并试图转换它。这就是紧接在后面的continue语句所起到的作用——放弃执行后面的语句并开始下一次迭代。而如果数据转换成功,我们就会计算数据块代表的int32类型值的的立方根。然后就是生成结果描述并把它发送给客户端程序。发送操作同样用到了write函数。

至此,handleConn函数的主体(那个for代码块)已经被我们实现了。不过,还要注意,当它执行结束的时候应该把连接关闭。它被结束执行可能是由于主体已经执行结束,也可能是某些代码引发了一个运行时恐慌。不论怎样,把当前连接及时关闭掉都是一件很重要的事情。这也捎带着满足了关闭闲置连接的需求。还记得吗?当当前连接被判断为闲置连接的时候,read函数会返回非nil的错误值,并且那个for代码块中唯一的一条break语句会被执行。在这种情况下,我们肯定会用到defer语句,像这样:

  1. defer conn.Close()

为了最大程度地保证连接的及时关闭,我们应该把这条defer语句放置在handleConn函数的函数体的开始处。

好了,handleConn函数和serverGo函数的编写已基本完成。下面我们来编写clientGo函数。clientGo函数的简单声明是这样的:

  1. func clientGo(id int)

该函数接受一个名为id的、int类型的参数。接受这样一个参数只是因为要在运行多个客户端程序的场景下在日志中区分它们。我们不用太过在意它。clientGo函数应该首先试图与服务端程序建立连接,代码如下:

  1. conn, err := net.DialTimeout(SERVER_NETWORK, SERVER_ADDRESS, 2*time.Second)
  2. if err != nil {
  3. printLog("Dial Error: %s (Client[%d])\n", err, id)
  4. return
  5. }
  6. defer conn.Close()
  7. printLog("Connected to server. (remote address: %s, local address: %s) (Client[%d])\n",
  8. conn.RemoteAddr(), conn.LocalAddr(), id)
  9. time.Sleep(200 * time.Millisecond)

可以看到,如果连接不成功,那么就在记录日志之后直接返回。这也就意味着客户端程序的执行的结束。要使连接成功,最基本的条件就是连接操作应该在服务端程序已经启动的情况下进行。由于客户端程序和服务端程序处在同一个命令源码文件中,所以这需要一点小技巧。这个我们稍后再讲。

下面那一条defer语句保证了在clientGo函数的执行将要结束的时候当前的连接会被关闭。这对于两端的程序都是有好处的。另外,解释一下,让客户端程序“睡眠”200毫秒纯属是为了两端程序记录的日志看起来更清晰一些。因为它们会出现在同一台计算机的标准输出上。

现在一切准备就绪,我们开始编写发送请求数据的代码。为了让两端的程序在体现出它们的功能和整体流程之后就结束执行,我们把客户端程序发送的请求数据块的数量定为5个。这需要声明一个变量,以便在发送数据块和接收数据块的时候都以此作为迭代次数。另外,为了满足检验服务端程序的响应速度的需求,我们还要在发送和接收操作开始之前设置一下超时时间。据此,有两行代码需要首先被编写出来,像这样:

  1. requestNumber := 5
  2. conn.SetDeadline(time.Now().Add(5 * time.Millisecond))

发送数据块的代码并不难编写,我们已经在实现serverGo函数的时候编写过类似的代码了。用来发送数据块的for代码块如下:

  1. for i := 0; i < requestNumber; i++ {
  2. i32Req := rand.Int31()
  3. n, err := write(conn, fmt.Sprintf("%d", i32Req))
  4. if err != nil {
  5. printLog("Write Error: %s (Client[%d])\n", err, id)
  6. continue
  7. }
  8. printLog("Sent request (written %d bytes): %d (Client[%d])\n", n, i32Req, id)
  9. }

其中,标准库代码包rand中的函数Int31可以生成一个随机的int32类型值。使用这样的随机值,既可以省去我们专门设计一个整数序列的时间,也可以更充分地体现出服务端程序的效能。读者还记得fmt.Sprintf函数吗?它可以把任意个任意类型的值转换为具有给定格式的string类型值。在很多时候,这比使用其他转换方法更加简单方便。在这里,我们用它来把一个int32类型值转换成一个string类型值。至于此for代码块中的其他语句,应该就不用更多的解释了吧?如果读者忘记了它们的含义和作用,那就再看一眼前面对handleConn函数的说明吧。

在把所有的5个请求数据块都发送出去之后,客户端程序应该开始着手接收响应数据块的事情了。实现这一功能的代码与服务端程序中接收请求数据块的代码如出一辙。我们要把这些代码放置在一个for代码块里面。这是因为我们需要在接收到所有预期的响应数据块之后,及时关闭当前连接并结束客户端程序的执行。这里所说的for代码块是这样的:

  1. for j := 0; j < requestNumber; j++ {
  2. strResp, err := read(conn)
  3. if err != nil {
  4. if err == io.EOF {
  5. printLog("The connection is closed by another side. (Client[%d])\n", id)
  6. } else {
  7. printLog("Read Error: %s (Client[%d])\n", err, id)
  8. }
  9. break
  10. }
  11. printLog("Received response: %s (Client[%d])\n", strResp, id)
  12. }

在编写完serverGoclientGo以及相关函数之后,我们还需要考虑一件事情,那就是怎么样协调服务端程度、客户端程序以及main函数的执行。只要main函数的执行结束了,当前的这个与命令源码文件tcpsock.go对应的进程也就会随即消失。所以,我们要让main函数等待serverGo函数和clientGo函数都执行完毕之后再结束执行。这样,我们就要使用到前面提到过的sync.WaitGroup类型值了。为了让服务端程序和客户端程序都能使用到该值,我们要把该值声明为一个全局变量。当然,为了遵循开放封闭原则,该变量应该是包级私有的。据此,这个变量的声明如下:

  1. var wg sync.WaitGroup

现在,我们开始编写main函数的函数体。为了让服务端程序和客户端程序能够并发地运行,我们应该分别使用go语句执行serverGo函数和clientGo函数。并且,客户端程序运行的时机应该在服务端程序开始运行并已准备好接收新连接之后。因此,我们应该让这两条go语句的执行之间有一点时间间隔。500毫秒的间隔时间在这里是足够的。根据上面的简单分析,main函数的第一个版本是这样的:

  1. func main() {
  2. go serverGo()
  3. time.Sleep(500 * time.Millisecond)
  4. go clientGo(1)
  5. }

要使我们前面声明的变量wg能够真正起到作用,我们还需要对现有的serverGo函数、clientGo函数以及第一个版本的main函数进行改造。下面是改造后的main函数:

  1. func main() {
  2. wg.Add(2)
  3. go serverGo()
  4. time.Sleep(500 * time.Millisecond)
  5. go clientGo(1)
  6. wg.Wait()
  7. }

如果我们只运行一个服务端程序和一个客户端程序的话,我们调用wgAdd方法的时候应该以2作为参数。这表示main函数只需等待上述两个程序运行完毕后即可。注意,该调用语句必须出现在这两个程序被运行之前。另外,我们在这里说的“等待”的操作是由调用语句wg.Wait()代表的。

如果我们不对serverGo函数和clientGo函数做出修改,当main函数被执行的时候,它将会永远在wg.Wait()语句处被阻塞。至于原因,我们在第一次接触sync.WaitGroup类型值的时候(6.2.4节)已经有所说明。我们要在serverGo函数和clientGo函数的函数体的最前面都加入一条语句:

  1. defer wg.Done()

这样,在这两个函数都被执行结束的时候main函数即可从wg.Wait()语句处继续往下执行了。

至此,在tcpsock.go文件中的示例的编码工作全部完成。现在我们来运行一下这个示例。在运行该示例之后标准输出上将出现如下内容:

  1. 1: Got listener for the server. (local address: 127.0.0.1:8085)
  2. 2: Connected to server. (remote address: 127.0.0.1:8085, local address: 127.0.0.1:51036) (Client[1])
  3. 3: Established a connection with a client application. (remote address: 127.0.0.1:51036)
  4. 4: Sent request (written 11 bytes): 1298498081 (Client[1])
  5. 5: Sent request (written 11 bytes): 2019727887 (Client[1])
  6. 6: Sent request (written 11 bytes): 1427131847 (Client[1])
  7. 7: Sent request (written 10 bytes): 939984059 (Client[1])
  8. 8: Sent request (written 10 bytes): 911902081 (Client[1])
  9. 9: Received request: 1298498081 (Server)
  10. 10: Sent response (written 44 bytes): The cube root of 1298498081 is 1090.972418. (Server)
  11. 11: Received request: 2019727887 (Server)
  12. 12: Sent response (written 44 bytes): The cube root of 2019727887 is 1264.050100. (Server)
  13. 13: Received request: 1427131847 (Server)
  14. 14: Sent response (written 44 bytes): The cube root of 1427131847 is 1125.869444. (Server)
  15. 15: Received request: 939984059 (Server)
  16. 16: Sent response (written 42 bytes): The cube root of 939984059 is 979.580571. (Server)
  17. 17: Received request: 911902081 (Server)
  18. 18: Sent response (written 42 bytes): The cube root of 911902081 is 969.726809. (Server)
  19. 19: Received response: The cube root of 1298498081 is 1090.972418. (Client[1])
  20. 20: Received response: The cube root of 2019727887 is 1264.050100. (Client[1])
  21. 21: Received response: The cube root of 1427131847 is 1125.869444. (Client[1])
  22. 22: Received response: The cube root of 939984059 is 979.580571. (Client[1])
  23. 23: Received response: The cube root of 911902081 is 969.726809. (Client[1])
  24. 24: The connection is closed by another side. (Server)

从这24个日志记录项中,我们可以清晰地看到两个程序共同完成的示例流程。第1~3个日志记录项反应出了服务端程序的启动过程,以及它与唯一的一个客户端程序的连接过程。第4~8个日志记录项表示该客户端程序连续向服务端程序发送了5个请求数据块。第9个和10个日志记录项表示服务端程序接到了第1个请求数据块,并在进行相应处理后向客户端程序发送了相应的结果描述。在这之后的8项日志记录则体现了服务端程序对之后到达的4个请求数据块的处理情况。而证明客户端程序已收到全部的5个结果描述的是第19~23个日志记录项。最后一项日志记录是服务端程序发出的,它表明了客户端程序在收到所有结果描述之后主动地关闭了与服务端程序建立的连接。综上所述,这些日志记录项所体现出的流程细节均正如我们所愿。

读者应该能从这个完整示例中学习到怎样使用Go语言提供的Socket编程API编写能够相互通讯的程序。虽然我们的示例将服务端程序和客户端程序置于同一个进程之中,但是在绝大多数应用场景中通讯两端的程序是由不同的进程代表的。在很多情形下,它们往往不是在同一台计算机上甚至不在同一个子网络中的两个程序。可以说,使用Socket接口的程序可以在网络中的任何地方与另一个同类程序进行通讯。并且,这些程序可以是由不同的编程语言编写的。

在Go语言标准库中,一些实现了某种网络通讯功能的代码包都是以net代码包所提供的Socket编程API为基础的。其中最有代表性的就是net/http代码包。它以此为基础实现了TCP/IP协议栈的应用层协议HTTP,并为我们提供了非常好用的API。这些API让我们可以非常方便地编写出满足一般性需求的Web应用程序。

net代码包之外,标准库代码包net/rpc中的API为我们提供了在两个Go语言程序之间建立通讯和交换数据的另一个种方式。这种方式被称为远程过程调用(Remote Procedure Call)。这个代码包中的程序也是基于TCP/IP协议栈的。它们也使用到了net包以及net/http包提供的API。