面向流消费者的API


流可以是可读(Readable)、可写(Writable),或者兼具两者(Duplex,双工)的。

所有流都是 EventEmitter 的实例,但它们也有其它自定义的方法和属性,这取决于它们是 可读(Readable)、可写(Writable) 或 双工(Duplex)。

如果一个流既可读(Readable)也可写(Writable),那么它就实现了(流的)所有方法和事件。因此,双工(Duplex)或转换(Transform)流充分诠释了这些 API ,虽然它们的实现可能有所不同。

没有必要在你的程序里的消费者流中实现流接口。如果你正在自己的程序中实现流接口,请同时参阅面向实现者的API

几乎所有 Node.js 程序,无论多么简单,都以某种方式使用了流。这里有一个使用流的 Node.js 程序的例子:

  1. const http = require('http');
  2. var server = http.createServer((req, res) => {
  3. // req is an http.IncomingMessage, which is a Readable Stream
  4. // res is an http.ServerResponse, which is a Writable Stream
  5. var body = '';
  6. // we want to get the data as utf8 strings
  7. // If you don't set an encoding, then you'll get Buffer objects
  8. req.setEncoding('utf8');
  9. // Readable streams emit 'data' events once a listener is added
  10. req.on('data', (chunk) => {
  11. body += chunk;
  12. });
  13. // the end event tells you that you have entire body
  14. req.on('end', () => {
  15. try {
  16. var data = JSON.parse(body);
  17. } catch (er) {
  18. // uh oh! bad json!
  19. res.statusCode = 400;
  20. return res.end(`error: ${er.message}`);
  21. }
  22. // write back something interesting to the user:
  23. res.write(typeof data);
  24. res.end();
  25. });
  26. });
  27. server.listen(1337);
  28. // $ curl localhost:1337 -d '{}'
  29. // object
  30. // $ curl localhost:1337 -d '"foo"'
  31. // string
  32. // $ curl localhost:1337 -d 'not json'
  33. // error: Unexpected token o

stream.Readable类

可读(Readable)流接口是你正在读取的一个数据来源的抽象。换言之,数据出自一个可读(Readable)流。

一个可读(Readable)流在你表明你准备接收前将不会开始发出数据。

可读(Readable)流有两种“模式”:流动模式暂停模式。当处于流动模式时,数据由底层系统读出,并尽可能快地提供给你的程序;当处于暂停模式时,你必须明确地调用 stream.read() 来取出数据块。流开始处于暂停模式。

注意:如果没有绑定数据事件处理器,并且没有 stream.pipe() 目标,同时流被切换到流动模式,那么数据将会流失。

你可以通过下面几种做法切换到流动模式:

你可以通过下面的一种做法切换回暂停模式:

  • 如果没有导流目标,可以调用 stream.pause() 方法。

  • 如果有导流目标,移除所有 ‘data’ 事件 处理器,并通过调用 stream.unpipe() 方法移除所有导流目标。

请注意,为了向后兼容考虑,移除 ‘data’ 事件 处理器并不会自动暂停流。同样的,当有导流目标时,调用 stream.pause() 并不能保证流在那些目标排空并请求更多数据时维持暂停状态。

可读(Readable)流的例子包括:

readable事件

当可以从流中读取数据块时,它就会触发一个 'readable' 事件。

在某些情况下,假如未准备好,监听一个 'readable' 事件会使一些数据从底层系统被读出到内部缓冲区中。

  1. var readable = getReadableStreamSomehow();
  2. readable.on('readable', () => {
  3. // there is some data to read now
  4. });

当内部缓冲区被排空后,一旦有更多数据可用时,会再次触发一个 'readable' 事件。

'readable' 事件不会在“流动”模式中触发的最后唯一一个例外是在流的末尾。

'readable' 事件表明流有新的信息:无论新的数据是可用的,还是已经到达流的末尾。在前者的情况下,stream.read() 将会返回那些数据。在后者的情况下,stream.read() 将会返回 null 。例如,在下面的例子中,foo.txt 是一个空文件:

  1. const fs = require('fs');
  2. var rr = fs.createReadStream('foo.txt');
  3. rr.on('readable', () => {
  4. console.log('readable:', rr.read());
  5. });
  6. rr.on('end', () => {
  7. console.log('end');
  8. });

运行此脚本的输出:

  1. $ node test.js
  2. readable: null
  3. end

data事件

  • chunk {Buffer} | {String} 数据块

绑定一个 'data' 事件监听器到一个未被明确暂停的流上,流会被切换到流动模式。数据只要可用就会被传递。

如果你想尽可能快地从流中取出所有的数据,这是最佳的方式。

  1. var readable = getReadableStreamSomehow();
  2. readable.on('data', (chunk) => {
  3. console.log('got %d bytes of data', chunk.length);
  4. });

end事件

这个事件会在没有更多的数据可读时触发。

请注意,'end' 事件在数据被完全消费之前不会被触发。它可以在切换到流动模式后或直到你到达末端前通过不停地调用 stream.read() 时被完成。

  1. var readable = getReadableStreamSomehow();
  2. readable.on('data', (chunk) => {
  3. console.log('got %d bytes of data', chunk.length);
  4. });
  5. readable.on('end', () => {
  6. console.log('there will be no more data.');
  7. });

close事件

当流和底层数据源(比如,文件描述符)被关闭时触发。并不是所有流都会触发这个事件。这个事件表明没有更多的事件将被触发,并且不会做进一步的计算。

并不是所有的流都会触发 'close' 事件。

error事件

  • {Error}

在接收数据出错时触发。

readable.read([size])

  • size {Number} 可选参数,指定要读取多少数据。

  • 返回:{String} | {Buffer} | {Null}

read() 方法从内部缓冲区中拉取并返回一些数据。当没有数据可用,它会返回 null

如果你传了一个 size 参数,那么它就会返回多少字节的数据。如果 size 字节不可用,那么它将返回 null,除非已经到了数据末端,在这种情况下,它将返回保留在缓冲区中的数据。

如果你没有指定 size 参数,那么它就会返回内部缓冲区中的所有数据。

该方法应该仅在暂停模式下被调用。在流动模式下,该方法会被自动调用直到内部缓冲区排空。

  1. var readable = getReadableStreamSomehow();
  2. readable.on('readable', () => {
  3. var chunk;
  4. while (null !== (chunk = readable.read())) {
  5. console.log('got %d bytes of data', chunk.length);
  6. }
  7. });

如果该方法返回了一个数据块,那么它也会触发 ‘data’ 事件。

请注意,在 ‘end’ 事件触发后调用 stream.read([size]) 将会返回 null,并且不会产生错误警告。

readable.setEncoding(encoding)

  • encoding {String} 要使用的编码。

  • 返回:this

调用此函数会使得流返回指定编码的字符串而不是 Buffer 对象。例如,如果你使用 readable.setEncoding('utf8'),那么输出数据会被作为 UTF-8 数据解析,并作为字符串返回。如果你 readable.setEncoding('hex'),那么数据会被编码成十六进制字符串格式。

该方法能妥善处理多字节字符,如果你直接取出 Buffer 并对它们调用 buf.toString(encoding),很可能会导致字节错位。如果你想要以字符串形式读取数据,请始终使用该方法。

你还可以使用 readable.setEncoding(null) 完全禁用任何编码。如果你在处理二进制数据或将大型的多字节字符串分成多块时,这种做法将非常有用。

  1. var readable = getReadableStreamSomehow();
  2. readable.setEncoding('utf8');
  3. readable.on('data', (chunk) => {
  4. assert.equal(typeof chunk, 'string');
  5. console.log('got %d characters of string data', chunk.length);
  6. });

readable.pipe(destination[, options])

  • destination {stream.Writable} 写入数据的目标

  • options {Object} Pipe 选项

    • end {Boolean} 当读取结束时终止写入,默认为 true

该方法从可读流中拉取所有数据,并写入到所提供的目标。该方法能自动控制流量以避免目标被快速读取的可读流所淹没。

可以安全地导流到多个目标。

  1. var readable = getReadableStreamSomehow();
  2. var writable = fs.createWriteStream('file.txt');
  3. // All the data from readable goes into 'file.txt'
  4. readable.pipe(writable);

该函数返回目标的流,因此你可以建立像这样的导流链:

  1. var r = fs.createReadStream('file.txt');
  2. var z = zlib.createGzip();
  3. var w = fs.createWriteStream('file.txt.gz');
  4. r.pipe(z).pipe(w);

例如,模拟 Unix 的 cat 命令:

  1. process.stdin.pipe(process.stdout);

默认情况下,当源数据流触发 ‘end’ 事件时,目标的 stream.end() 会被调用,因此 destination 不再可写。传入 {end: false} 作为 options 可以保持目标流的开启状态。

这让 writer 保持开启,因此最后可以写入 “Goodbye”。

  1. reader.pipe(writer, {
  2. end: false
  3. });
  4. reader.on('end', () => {
  5. writer.end('Goodbye\n');
  6. });

请注意 process.stderrprocess.stdout 在进程结束前都不会被关闭,无论是否指定选项。

readable.unpipe([destination])

  • destination {stream.Writable} 可选,指定解除导流的流

该方法会移除之前调用 stream.pipe() 所设的钩子。

如果没有指定目标,那么将移除所有的管道。

如果指定了目标,但并没有与之建立导流,那么什么事都不会发生。

  1. var readable = getReadableStreamSomehow();
  2. var writable = fs.createWriteStream('file.txt');
  3. // All the data from readable goes into 'file.txt',
  4. // but only for the first second
  5. readable.pipe(writable);
  6. setTimeout(() => {
  7. console.log('stop writing to file.txt');
  8. readable.unpipe(writable);
  9. console.log('manually close the file stream');
  10. writable.end();
  11. }, 1000);

readable.unshift(chunk)

  • chunk {Buffer} | {String} 回读队列开头的数据块

该方法在某些情况下很有用,比如一个流正在被一个解析器消费,解析器需要“逆消费”某些刚从源中拉取出来的数据,以便流可以传递给其它消费者。

请注意,stream.unshift(chunk) 不能在 ‘end’ 事件 触发后调用,否则将产生一个运行时错误。

如果你发现你必须在你的程序中频繁调用 stream.unshift(chunk) ,请考虑实现一个转换(Transform)流作为替代。(详见面向流实现者的 API

  1. // Pull off a header delimited by \n\n
  2. // use unshift() if we get too much
  3. // Call the callback with (error, header, stream)
  4. const StringDecoder = require('string_decoder').StringDecoder;
  5. function parseHeader(stream, callback) {
  6. stream.on('error', callback);
  7. stream.on('readable', onReadable);
  8. var decoder = new StringDecoder('utf8');
  9. var header = '';
  10. function onReadable() {
  11. var chunk;
  12. while (null !== (chunk = stream.read())) {
  13. var str = decoder.write(chunk);
  14. if (str.match(/\n\n/)) {
  15. // found the header boundary
  16. var split = str.split(/\n\n/);
  17. header += split.shift();
  18. var remaining = split.join('\n\n');
  19. var buf = new Buffer(remaining, 'utf8');
  20. if (buf.length)
  21. stream.unshift(buf);
  22. stream.removeListener('error', callback);
  23. stream.removeListener('readable', onReadable);
  24. // now the body of the message can be read from the stream.
  25. callback(null, header, stream);
  26. } else {
  27. // still reading the header.
  28. header += str;
  29. }
  30. }
  31. }
  32. }

请注意,不像 stream.push(chunk) 那样,stream.unshift(chunk) 不会通过重置流的内部读取状态结束读取过程。如果在读取过程(比如,在一个 stream._read() 内部实现一个自定义流)中调用 unshift() 将导致意想不到的结果。在调用 unshift() 后立即调用 stream.push(‘’) 会适当地重置读取状态,然而最好简单地避免在执行一个读出过程中调用 unshift()

readable.pause()

  • 返回:this

该方法会使一个处于流动模式的流停止触发 ‘data’ 事件,切换到非流动模式,并让后续可用数据留在内部缓冲区中。

  1. var readable = getReadableStreamSomehow();
  2. readable.on('data', (chunk) => {
  3. console.log('got %d bytes of data', chunk.length);
  4. readable.pause();
  5. console.log('there will be no more data for 1 second');
  6. setTimeout(() => {
  7. console.log('now data will start flowing again');
  8. readable.resume();
  9. }, 1000);
  10. });

readable.isPaused()

  • 返回:{Boolean}

这个方法返回是否 readable 已通过客户端代码被明确暂停(在不存在相应的 stream.resume() 的情况下使用 stream.pause()

  1. var readable = new stream.Readable
  2. readable.isPaused() // === false
  3. readable.pause()
  4. readable.isPaused() // === true
  5. readable.resume()
  6. readable.isPaused() // === false

readable.resume()

  • 返回:this

该方法让可读流恢复触发 ‘data’ 事件。

该方法会将流切换到流动模式。如果你想从流中消费数据,但你想得到它的 ‘end’ 事件,你可以调用 stream.resume() 来开启数据流。

  1. var readable = getReadableStreamSomehow();
  2. readable.resume();
  3. readable.on('end', () => {
  4. console.log('got to the end, but did not read anything');
  5. });

readable.wrap(stream)

  • stream {Stream} 一个“旧式”可读流

Node.js v0.10 版本之前的流并未实现现今所有流 API。(更多信息详见“兼容性”章节。)

如果你正在使用一个早期版本的 Node.js 库,它会触发 ‘data’ 事件并且有一个仅作查询用途的 stream.pause() 方法,那么你可以使用 wrap() 方法来创建一个使用旧式流作为数据源的可读(Readable)流。

你可能很少需要用到这个函数,但它会作为与旧 Node.js 程序和库交互的简便方法存在。

例如:

  1. const OldReader = require('./old-api-module.js').OldReader;
  2. const Readable = require('stream').Readable;
  3. const oreader = new OldReader;
  4. const myReader = new Readable().wrap(oreader);
  5. myReader.on('readable', () => {
  6. myReader.read(); // etc.
  7. });

stream.Writable类

可写(Writable)流接口是一个你正在写入数据的目标的抽象。

可写流的例子包括:

pipe事件

  • src {stream.Readable} 被导流到该可写流的来源流

每当 stream.pipe() 方法在一个可读流上被调用并添加当前可写流到所设定的目标时触发。

  1. var writer = getWritableStreamSomehow();
  2. var reader = getReadableStreamSomehow();
  3. writer.on('pipe', (src) => {
  4. console.error('something is piping into the writer');
  5. assert.equal(src, reader);
  6. });
  7. reader.pipe(writer);

unpipe事件

  • src {stream.Readable} 被解除导流到该可写流的来源流

每当 stream.unpipe() 方法在一个可读流上被调用并移除当前可写流到所设定的目标时触发。

  1. var writer = getWritableStreamSomehow();
  2. var reader = getReadableStreamSomehow();
  3. writer.on('unpipe', (src) => {
  4. console.error('something has stopped piping into the writer');
  5. assert.equal(src, reader);
  6. });
  7. reader.pipe(writer);
  8. reader.unpipe(writer);

drain事件

如果一个 stream.write(chunk) 调用返回 false 时,那么 'drain' 事件将表明什么时候适合开始向流中写入更多的数据。

  1. // Write the data to the supplied writable stream one million times.
  2. // Be attentive to back-pressure.
  3. function writeOneMillionTimes(writer, data, encoding, callback) {
  4. var i = 1000000;
  5. write();
  6. function write() {
  7. var ok = true;
  8. do {
  9. i -= 1;
  10. if (i === 0) {
  11. // last time!
  12. writer.write(data, encoding, callback);
  13. } else {
  14. // see if we should continue, or wait
  15. // don't pass the callback, because we're not done yet.
  16. ok = writer.write(data, encoding);
  17. }
  18. } while (i > 0 && ok);
  19. if (i > 0) {
  20. // had to stop early!
  21. // write some more once it drains
  22. writer.once('drain', write);
  23. }
  24. }
  25. }

finish事件

stream.end() 方法被调用,并且所有数据已强制写入到底层系统时,此事件会被触发。

  1. var writer = getWritableStreamSomehow();
  2. for (var i = 0; i < 100; i++) {
  3. writer.write('hello, #${i}!\n');
  4. }
  5. writer.end('this is the end\n');
  6. writer.on('finish', () => {
  7. console.error('all writes are now complete.');
  8. });

error事件

  • {Error}

当写入或导流数据出现错误时触发。

writable.write(chunk[, encoding][, callback])

  • chunk {String} | {Buffer} 要写入的数据

  • encoding {String} 当前编码,如果 chunk 是一个字符串

  • callback {Function} 当数据块被强制写入时回调

  • 返回:{Boolean} 如果数据被完全处理时返回 true

该方法将一些数据写入到底层系统中,并且一旦数据已完全处理,就会调用所提供的回调。如果发生错误,则回调可能会也可能不会将错误作为第一个参数调用。为了检测写入错误,请监听 ‘error’ 事件。

返回值表示你是否应该立即继续写入。如果数据已被滞留在内部,那么它会返回 false 。否则,它会返回 true

这个返回值仅供参考。你可以继续写入,即使它返回 false 。然而,写入的数据会被滞留在内存中,所以最好不要过分地这么做。最好的做法是等待 ‘drain’ 事件发生后再继续写入更多数据。

writable.setDefaultEncoding(encoding)

  • encoding {String} 新的默认编码

设置一个可写流的默认编码。

writable.cork()

强制滞留所有的写入。

滞留的数据会在调用 stream.uncork()stream.end() 时被强制写入。

writable.uncork()

强制写入所有调用 stream.cork() 后滞留的数据。

writable.end([chunk][, encoding][, callback])

  • chunk {String} | {Buffer} 可选,要写入的数据

  • encoding {String} 当前编码,如果 chunk 是一个字符串

  • callback {Function} 可选,当流完成时回调

当没有更多数据会被写入到流时调用此方法。如果提供,该回调会作为 ‘finish’ 事件的一个附加的监听器。

在调用 stream.end() 后调用 stream.write() 会引发错误。

  1. // write 'hello, ' and then end with 'world!'
  2. var file = fs.createWriteStream('example.txt');
  3. file.write('hello, ');
  4. file.end('world!');
  5. // writing more now is not allowed!

stream.Duplex类

双工(Duplex)流是同时实现了可读(Readable)和可写(Writable)接口的流。

双工(Duplex)流的例子包括:

stream.Transform类

转换(Transform)流是一种输出由输入计算所得的双工(Duplex)流。他们同时实现了可读(Readable)和可写(Writable)接口。

转换(Transform)流的例子包括: