Backpressuring in Streams

There is a general problem that occurs during data handling called backpressure and describes a buildup of data behind a buffer during data transfer. When the receiving end of the transfer has complex operations, or is slower for whatever reason, there is a tendency for data from the incoming source to accumulate, like a clog.

To solve this problem, there must be a delegation system in place to ensure a smooth flow of data from one source to another. Different communities have resolved this issue uniquely to their programs, Unix pipes and TCP sockets are good examples of this, and are often referred to as flow control. In Node.js, streams have been the adopted solution.

The purpose of this guide is to further detail what backpressure is, and how exactly streams address this in Node.js' source code. The second part of the guide will introduce suggested best practices to ensure your application's code is safe and optimized when implementing streams.

We assume a little familiarity with the general definition of backpressure, Buffer, and EventEmitters in Node.js, as well as some experience with Stream. If you haven't read through those docs, it's not a bad idea to take a look at the API documentation first, as it will help expand your understanding while reading this guide.

The Problem with Data Handling

In a computer system, data is transferred from one process to another through pipes, sockets, and signals. In Node.js, we find a similar mechanism called Stream. Streams are great! They do so much for Node.js and almost every part of the internal codebase utilizes that module. As a developer, you are more than encouraged to use them too!

const module "node:readline"readline = 
var require: NodeJS.Require
(id: string) => any
Used to import modules, `JSON`, and local files.
@sincev0.1.13
require
('node:readline');
// process.stdin and process.stdout are both instances of Streams. const const rl: readline.Interfacerl = module "node:readline"readline.function createInterface(options: readline.ReadLineOptions): readline.Interface (+1 overload)
The `readline.createInterface()` method creates a new `readline.Interface` instance. ```js import readline from 'node:readline'; const rl = readline.createInterface({ input: process.stdin, output: process.stdout, }); ``` Once the `readline.Interface` instance is created, the most common case is to listen for the `'line'` event: ```js rl.on('line', (line) => { console.log(`Received: ${line}`); }); ``` If `terminal` is `true` for this instance then the `output` stream will get the best compatibility if it defines an `output.columns` property and emits a `'resize'` event on the `output` if or when the columns ever change (`process.stdout` does this automatically when it is a TTY). When creating a `readline.Interface` using `stdin` as input, the program will not terminate until it receives an [EOF character](https://en.wikipedia.org/wiki/End-of-file#EOF_character). To exit without waiting for user input, call `process.stdin.unref()`.
@sincev0.1.98
createInterface
({
ReadLineOptions.input: NodeJS.ReadableStream
The [`Readable`](https://nodejs.org/docs/latest-v22.x/api/stream.html#readable-streams) stream to listen to
input
: var process: NodeJS.Processprocess.
NodeJS.Process.stdin: NodeJS.ReadStream & {
    fd: 0;
}
The `process.stdin` property returns a stream connected to`stdin` (fd `0`). It is a `net.Socket` (which is a `Duplex` stream) unless fd `0` refers to a file, in which case it is a `Readable` stream. For details of how to read from `stdin` see `readable.read()`. As a `Duplex` stream, `process.stdin` can also be used in "old" mode that is compatible with scripts written for Node.js prior to v0.10\. For more information see `Stream compatibility`. In "old" streams mode the `stdin` stream is paused by default, so one must call `process.stdin.resume()` to read from it. Note also that calling `process.stdin.resume()` itself would switch stream to "old" mode.
stdin
,
ReadLineOptions.output?: NodeJS.WritableStream | undefined
The [`Writable`](https://nodejs.org/docs/latest-v22.x/api/stream.html#writable-streams) stream to write readline data to.
output
: var process: NodeJS.Processprocess.
NodeJS.Process.stdout: NodeJS.WriteStream & {
    fd: 1;
}
The `process.stdout` property returns a stream connected to`stdout` (fd `1`). It is a `net.Socket` (which is a `Duplex` stream) unless fd `1` refers to a file, in which case it is a `Writable` stream. For example, to copy `process.stdin` to `process.stdout`: ```js import { stdin, stdout } from 'node:process'; stdin.pipe(stdout); ``` `process.stdout` differs from other Node.js streams in important ways. See `note on process I/O` for more information.
stdout
,
}); const rl: readline.Interfacerl.Interface.question(query: string, callback: (answer: string) => void): void (+1 overload)
The `rl.question()` method displays the `query` by writing it to the `output`, waits for user input to be provided on `input`, then invokes the `callback` function passing the provided input as the first argument. When called, `rl.question()` will resume the `input` stream if it has been paused. If the `Interface` was created with `output` set to `null` or `undefined` the `query` is not written. The `callback` function passed to `rl.question()` does not follow the typical pattern of accepting an `Error` object or `null` as the first argument. The `callback` is called with the provided answer as the only argument. An error will be thrown if calling `rl.question()` after `rl.close()`. Example usage: ```js rl.question('What is your favorite food? ', (answer) => { console.log(`Oh, so your favorite food is ${answer}`); }); ``` Using an `AbortController` to cancel a question. ```js const ac = new AbortController(); const signal = ac.signal; rl.question('What is your favorite food? ', { signal }, (answer) => { console.log(`Oh, so your favorite food is ${answer}`); }); signal.addEventListener('abort', () => { console.log('The food question timed out'); }, { once: true }); setTimeout(() => ac.abort(), 10000); ```
@sincev0.3.3@paramquery A statement or query to write to `output`, prepended to the prompt.@paramcallback A callback function that is invoked with the user's input in response to the `query`.
question
('Why should you use streams? ', answer: stringanswer => {
var console: Console
The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```
@see[source](https://github.com/nodejs/node/blob/v22.x/lib/console.js)
console
.Console.log(message?: any, ...optionalParams: any[]): void (+1 overload)
Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
@sincev0.1.100
log
(`Maybe it's ${answer: stringanswer}, maybe it's because they are awesome! :)`);
const rl: readline.Interfacerl.Interface.close(): void
The `rl.close()` method closes the `Interface` instance and relinquishes control over the `input` and `output` streams. When called, the `'close'` event will be emitted. Calling `rl.close()` does not immediately stop other events (including `'line'`) from being emitted by the `Interface` instance.
@sincev0.1.98
close
();
});

A good example of why the backpressure mechanism implemented through streams is a great optimization can be demonstrated by comparing the internal system tools from Node.js' Stream implementation.

In one scenario, we will take a large file (approximately ~9 GB) and compress it using the familiar zip(1) tool.

zip The.Matrix.1080p.mkv

While that will take a few minutes to complete, in another shell we may run a script that takes Node.js' module zlib, that wraps around another compression tool, gzip(1).

const module "node:fs"fs = 
var require: NodeJS.Require
(id: string) => any
Used to import modules, `JSON`, and local files.
@sincev0.1.13
require
('node:fs');
const const gzip: Gzipgzip =
var require: NodeJS.Require
(id: string) => any
Used to import modules, `JSON`, and local files.
@sincev0.1.13
require
('node:zlib').function createGzip(options?: ZlibOptions): Gzip
Creates and returns a new `Gzip` object. See `example`.
@sincev0.5.8
createGzip
();
const const inp: fs.ReadStreaminp = module "node:fs"fs.function createReadStream(path: fs.PathLike, options?: BufferEncoding | ReadStreamOptions): fs.ReadStream
`options` can include `start` and `end` values to read a range of bytes from the file instead of the entire file. Both `start` and `end` are inclusive and start counting at 0, allowed values are in the \[0, [`Number.MAX_SAFE_INTEGER`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER)\] range. If `fd` is specified and `start` is omitted or `undefined`, `fs.createReadStream()` reads sequentially from the current file position. The `encoding` can be any one of those accepted by `Buffer`. If `fd` is specified, `ReadStream` will ignore the `path` argument and will use the specified file descriptor. This means that no `'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s should be passed to `net.Socket`. If `fd` points to a character device that only supports blocking reads (such as keyboard or sound card), read operations do not finish until data is available. This can prevent the process from exiting and the stream from closing naturally. By default, the stream will emit a `'close'` event after it has been destroyed. Set the `emitClose` option to `false` to change this behavior. By providing the `fs` option, it is possible to override the corresponding `fs` implementations for `open`, `read`, and `close`. When providing the `fs` option, an override for `read` is required. If no `fd` is provided, an override for `open` is also required. If `autoClose` is `true`, an override for `close` is also required. ```js import { createReadStream } from 'node:fs'; // Create a stream from some character device. const stream = createReadStream('/dev/input/event0'); setTimeout(() => { stream.close(); // This may not close the stream. // Artificially marking end-of-stream, as if the underlying resource had // indicated end-of-file by itself, allows the stream to close. // This does not cancel pending read operations, and if there is such an // operation, the process may still not be able to exit successfully // until it finishes. stream.push(null); stream.read(0); }, 100); ``` If `autoClose` is false, then the file descriptor won't be closed, even if there's an error. It is the application's responsibility to close it and make sure there's no file descriptor leak. If `autoClose` is set to true (default behavior), on `'error'` or `'end'` the file descriptor will be closed automatically. `mode` sets the file mode (permission and sticky bits), but only if the file was created. An example to read the last 10 bytes of a file which is 100 bytes long: ```js import { createReadStream } from 'node:fs'; createReadStream('sample.txt', { start: 90, end: 99 }); ``` If `options` is a string, then it specifies the encoding.
@sincev0.1.31
createReadStream
('The.Matrix.1080p.mkv');
const const out: fs.WriteStreamout = module "node:fs"fs.function createWriteStream(path: fs.PathLike, options?: BufferEncoding | WriteStreamOptions): fs.WriteStream
`options` may also include a `start` option to allow writing data at some position past the beginning of the file, allowed values are in the \[0, [`Number.MAX_SAFE_INTEGER`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER)\] range. Modifying a file rather than replacing it may require the `flags` option to be set to `r+` rather than the default `w`. The `encoding` can be any one of those accepted by `Buffer`. If `autoClose` is set to true (default behavior) on `'error'` or `'finish'` the file descriptor will be closed automatically. If `autoClose` is false, then the file descriptor won't be closed, even if there's an error. It is the application's responsibility to close it and make sure there's no file descriptor leak. By default, the stream will emit a `'close'` event after it has been destroyed. Set the `emitClose` option to `false` to change this behavior. By providing the `fs` option it is possible to override the corresponding `fs` implementations for `open`, `write`, `writev`, and `close`. Overriding `write()` without `writev()` can reduce performance as some optimizations (`_writev()`) will be disabled. When providing the `fs` option, overrides for at least one of `write` and `writev` are required. If no `fd` option is supplied, an override for `open` is also required. If `autoClose` is `true`, an override for `close` is also required. Like `fs.ReadStream`, if `fd` is specified, `fs.WriteStream` will ignore the `path` argument and will use the specified file descriptor. This means that no `'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s should be passed to `net.Socket`. If `options` is a string, then it specifies the encoding.
@sincev0.1.31
createWriteStream
('The.Matrix.1080p.mkv.gz');
const inp: fs.ReadStreaminp.
Stream.pipe<Gzip>(destination: Gzip, options?: {
    end?: boolean | undefined;
} | undefined): Gzip
pipe
(const gzip: Gzipgzip).
Stream.pipe<fs.WriteStream>(destination: fs.WriteStream, options?: {
    end?: boolean | undefined;
} | undefined): fs.WriteStream
pipe
(const out: fs.WriteStreamout);

To test the results, try opening each compressed file. The file compressed by the zip(1) tool will notify you the file is corrupt, whereas the compression finished by Stream will decompress without error.

In this example, we use .pipe() to get the data source from one end to the other. However, notice there are no proper error handlers attached. If a chunk of data were to fail to be properly received, the Readable source or gzip stream will not be destroyed. pump is a utility tool that would properly destroy all the streams in a pipeline if one of them fails or closes, and is a must-have in this case!

pump is only necessary for Node.js 8.x or earlier, as for Node.js 10.x or later version, pipeline is introduced to replace for pump. This is a module method to pipe between streams forwarding errors and properly cleaning up and providing a callback when the pipeline is complete.

Here is an example of using pipeline:

const module "node:fs"fs = 
var require: NodeJS.Require
(id: string) => any
Used to import modules, `JSON`, and local files.
@sincev0.1.13
require
('node:fs');
const { function pipeline<A extends Stream.PipelineSource<any>, B extends Stream.PipelineDestination<A, any>>(source: A, destination: B, callback: Stream.PipelineCallback<B>): B extends NodeJS.WritableStream ? B : NodeJS.WritableStream (+6 overloads)
A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete. ```js import { pipeline } from 'node:stream'; import fs from 'node:fs'; import zlib from 'node:zlib'; // Use the pipeline API to easily pipe a series of streams // together and get notified when the pipeline is fully done. // A pipeline to gzip a potentially huge tar file efficiently: pipeline( fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), (err) => { if (err) { console.error('Pipeline failed.', err); } else { console.log('Pipeline succeeded.'); } }, ); ``` The `pipeline` API provides a [`promise version`](https://nodejs.org/docs/latest-v22.x/api/stream.html#streampipelinesource-transforms-destination-options). `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`. * `Writable` streams which have emitted `'finish'` or `'close'`. `stream.pipeline()` leaves dangling event listeners on the streams after the `callback` has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors. If the last stream is readable, dangling event listeners will be removed so that the last stream can be consumed later. `stream.pipeline()` closes all the streams when an error is raised. The `IncomingRequest` usage with `pipeline` could lead to an unexpected behavior once it would destroy the socket without sending the expected response. See the example below: ```js import fs from 'node:fs'; import http from 'node:http'; import { pipeline } from 'node:stream'; const server = http.createServer((req, res) => { const fileStream = fs.createReadStream('./fileNotExist.txt'); pipeline(fileStream, res, (err) => { if (err) { console.log(err); // No such file // this message can't be sent once `pipeline` already destroyed the socket return res.end('error!!!'); } }); }); ```
@sincev10.0.0@paramcallback Called when the pipeline is fully done.
pipeline
} =
var require: NodeJS.Require
(id: string) => any
Used to import modules, `JSON`, and local files.
@sincev0.1.13
require
('node:stream');
const module "node:zlib"zlib =
var require: NodeJS.Require
(id: string) => any
Used to import modules, `JSON`, and local files.
@sincev0.1.13
require
('node:zlib');
// Use the pipeline API to easily pipe a series of streams // together and get notified when the pipeline is fully done. // A pipeline to gzip a potentially huge video file efficiently: pipeline<fs.ReadStream, zlib.Gzip, fs.WriteStream>(source: fs.ReadStream, transform1: zlib.Gzip, destination: fs.WriteStream, callback: (err: NodeJS.ErrnoException | null) => void): fs.WriteStream (+6 overloads)
A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete. ```js import { pipeline } from 'node:stream'; import fs from 'node:fs'; import zlib from 'node:zlib'; // Use the pipeline API to easily pipe a series of streams // together and get notified when the pipeline is fully done. // A pipeline to gzip a potentially huge tar file efficiently: pipeline( fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), (err) => { if (err) { console.error('Pipeline failed.', err); } else { console.log('Pipeline succeeded.'); } }, ); ``` The `pipeline` API provides a [`promise version`](https://nodejs.org/docs/latest-v22.x/api/stream.html#streampipelinesource-transforms-destination-options). `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`. * `Writable` streams which have emitted `'finish'` or `'close'`. `stream.pipeline()` leaves dangling event listeners on the streams after the `callback` has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors. If the last stream is readable, dangling event listeners will be removed so that the last stream can be consumed later. `stream.pipeline()` closes all the streams when an error is raised. The `IncomingRequest` usage with `pipeline` could lead to an unexpected behavior once it would destroy the socket without sending the expected response. See the example below: ```js import fs from 'node:fs'; import http from 'node:http'; import { pipeline } from 'node:stream'; const server = http.createServer((req, res) => { const fileStream = fs.createReadStream('./fileNotExist.txt'); pipeline(fileStream, res, (err) => { if (err) { console.log(err); // No such file // this message can't be sent once `pipeline` already destroyed the socket return res.end('error!!!'); } }); }); ```
@sincev10.0.0@paramcallback Called when the pipeline is fully done.
pipeline
(
module "node:fs"fs.function createReadStream(path: fs.PathLike, options?: BufferEncoding | ReadStreamOptions): fs.ReadStream
`options` can include `start` and `end` values to read a range of bytes from the file instead of the entire file. Both `start` and `end` are inclusive and start counting at 0, allowed values are in the \[0, [`Number.MAX_SAFE_INTEGER`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER)\] range. If `fd` is specified and `start` is omitted or `undefined`, `fs.createReadStream()` reads sequentially from the current file position. The `encoding` can be any one of those accepted by `Buffer`. If `fd` is specified, `ReadStream` will ignore the `path` argument and will use the specified file descriptor. This means that no `'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s should be passed to `net.Socket`. If `fd` points to a character device that only supports blocking reads (such as keyboard or sound card), read operations do not finish until data is available. This can prevent the process from exiting and the stream from closing naturally. By default, the stream will emit a `'close'` event after it has been destroyed. Set the `emitClose` option to `false` to change this behavior. By providing the `fs` option, it is possible to override the corresponding `fs` implementations for `open`, `read`, and `close`. When providing the `fs` option, an override for `read` is required. If no `fd` is provided, an override for `open` is also required. If `autoClose` is `true`, an override for `close` is also required. ```js import { createReadStream } from 'node:fs'; // Create a stream from some character device. const stream = createReadStream('/dev/input/event0'); setTimeout(() => { stream.close(); // This may not close the stream. // Artificially marking end-of-stream, as if the underlying resource had // indicated end-of-file by itself, allows the stream to close. // This does not cancel pending read operations, and if there is such an // operation, the process may still not be able to exit successfully // until it finishes. stream.push(null); stream.read(0); }, 100); ``` If `autoClose` is false, then the file descriptor won't be closed, even if there's an error. It is the application's responsibility to close it and make sure there's no file descriptor leak. If `autoClose` is set to true (default behavior), on `'error'` or `'end'` the file descriptor will be closed automatically. `mode` sets the file mode (permission and sticky bits), but only if the file was created. An example to read the last 10 bytes of a file which is 100 bytes long: ```js import { createReadStream } from 'node:fs'; createReadStream('sample.txt', { start: 90, end: 99 }); ``` If `options` is a string, then it specifies the encoding.
@sincev0.1.31
createReadStream
('The.Matrix.1080p.mkv'),
module "node:zlib"zlib.function createGzip(options?: zlib.ZlibOptions): zlib.Gzip
Creates and returns a new `Gzip` object. See `example`.
@sincev0.5.8
createGzip
(),
module "node:fs"fs.function createWriteStream(path: fs.PathLike, options?: BufferEncoding | WriteStreamOptions): fs.WriteStream
`options` may also include a `start` option to allow writing data at some position past the beginning of the file, allowed values are in the \[0, [`Number.MAX_SAFE_INTEGER`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER)\] range. Modifying a file rather than replacing it may require the `flags` option to be set to `r+` rather than the default `w`. The `encoding` can be any one of those accepted by `Buffer`. If `autoClose` is set to true (default behavior) on `'error'` or `'finish'` the file descriptor will be closed automatically. If `autoClose` is false, then the file descriptor won't be closed, even if there's an error. It is the application's responsibility to close it and make sure there's no file descriptor leak. By default, the stream will emit a `'close'` event after it has been destroyed. Set the `emitClose` option to `false` to change this behavior. By providing the `fs` option it is possible to override the corresponding `fs` implementations for `open`, `write`, `writev`, and `close`. Overriding `write()` without `writev()` can reduce performance as some optimizations (`_writev()`) will be disabled. When providing the `fs` option, overrides for at least one of `write` and `writev` are required. If no `fd` option is supplied, an override for `open` is also required. If `autoClose` is `true`, an override for `close` is also required. Like `fs.ReadStream`, if `fd` is specified, `fs.WriteStream` will ignore the `path` argument and will use the specified file descriptor. This means that no `'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s should be passed to `net.Socket`. If `options` is a string, then it specifies the encoding.
@sincev0.1.31
createWriteStream
('The.Matrix.1080p.mkv.gz'),
err: NodeJS.ErrnoException | nullerr => { if (err: NodeJS.ErrnoException | nullerr) { var console: Console
The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```
@see[source](https://github.com/nodejs/node/blob/v22.x/lib/console.js)
console
.Console.error(message?: any, ...optionalParams: any[]): void (+1 overload)
Prints to `stderr` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const code = 5; console.error('error #%d', code); // Prints: error #5, to stderr console.error('error', code); // Prints: error 5, to stderr ``` If formatting elements (e.g. `%d`) are not found in the first string then [`util.inspect()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilinspectobject-options) is called on each argument and the resulting string values are concatenated. See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
@sincev0.1.100
error
('Pipeline failed', err: NodeJS.ErrnoExceptionerr);
} else { var console: Console
The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```
@see[source](https://github.com/nodejs/node/blob/v22.x/lib/console.js)
console
.Console.log(message?: any, ...optionalParams: any[]): void (+1 overload)
Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
@sincev0.1.100
log
('Pipeline succeeded');
} } );

You can also use the stream/promises module to use pipeline with async / await:

const module "node:fs"fs = 
var require: NodeJS.Require
(id: string) => any
Used to import modules, `JSON`, and local files.
@sincev0.1.13
require
('node:fs');
const { function pipeline<A extends Stream.PipelineSource<any>, B extends Stream.PipelineDestination<A, any>>(source: A, destination: B, options?: Stream.PipelineOptions): Stream.PipelinePromise<B> (+6 overloads)pipeline } =
var require: NodeJS.Require
(id: string) => any
Used to import modules, `JSON`, and local files.
@sincev0.1.13
require
('node:stream/promises');
const module "node:zlib"zlib =
var require: NodeJS.Require
(id: string) => any
Used to import modules, `JSON`, and local files.
@sincev0.1.13
require
('node:zlib');
async function function run(): Promise<void>run() { try { await pipeline<fs.ReadStream, zlib.Gzip, fs.WriteStream>(source: fs.ReadStream, transform1: zlib.Gzip, destination: fs.WriteStream, options?: Stream.PipelineOptions): Promise<...> (+6 overloads)pipeline( module "node:fs"fs.function createReadStream(path: fs.PathLike, options?: BufferEncoding | ReadStreamOptions): fs.ReadStream
`options` can include `start` and `end` values to read a range of bytes from the file instead of the entire file. Both `start` and `end` are inclusive and start counting at 0, allowed values are in the \[0, [`Number.MAX_SAFE_INTEGER`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER)\] range. If `fd` is specified and `start` is omitted or `undefined`, `fs.createReadStream()` reads sequentially from the current file position. The `encoding` can be any one of those accepted by `Buffer`. If `fd` is specified, `ReadStream` will ignore the `path` argument and will use the specified file descriptor. This means that no `'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s should be passed to `net.Socket`. If `fd` points to a character device that only supports blocking reads (such as keyboard or sound card), read operations do not finish until data is available. This can prevent the process from exiting and the stream from closing naturally. By default, the stream will emit a `'close'` event after it has been destroyed. Set the `emitClose` option to `false` to change this behavior. By providing the `fs` option, it is possible to override the corresponding `fs` implementations for `open`, `read`, and `close`. When providing the `fs` option, an override for `read` is required. If no `fd` is provided, an override for `open` is also required. If `autoClose` is `true`, an override for `close` is also required. ```js import { createReadStream } from 'node:fs'; // Create a stream from some character device. const stream = createReadStream('/dev/input/event0'); setTimeout(() => { stream.close(); // This may not close the stream. // Artificially marking end-of-stream, as if the underlying resource had // indicated end-of-file by itself, allows the stream to close. // This does not cancel pending read operations, and if there is such an // operation, the process may still not be able to exit successfully // until it finishes. stream.push(null); stream.read(0); }, 100); ``` If `autoClose` is false, then the file descriptor won't be closed, even if there's an error. It is the application's responsibility to close it and make sure there's no file descriptor leak. If `autoClose` is set to true (default behavior), on `'error'` or `'end'` the file descriptor will be closed automatically. `mode` sets the file mode (permission and sticky bits), but only if the file was created. An example to read the last 10 bytes of a file which is 100 bytes long: ```js import { createReadStream } from 'node:fs'; createReadStream('sample.txt', { start: 90, end: 99 }); ``` If `options` is a string, then it specifies the encoding.
@sincev0.1.31
createReadStream
('The.Matrix.1080p.mkv'),
module "node:zlib"zlib.function createGzip(options?: zlib.ZlibOptions): zlib.Gzip
Creates and returns a new `Gzip` object. See `example`.
@sincev0.5.8
createGzip
(),
module "node:fs"fs.function createWriteStream(path: fs.PathLike, options?: BufferEncoding | WriteStreamOptions): fs.WriteStream
`options` may also include a `start` option to allow writing data at some position past the beginning of the file, allowed values are in the \[0, [`Number.MAX_SAFE_INTEGER`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER)\] range. Modifying a file rather than replacing it may require the `flags` option to be set to `r+` rather than the default `w`. The `encoding` can be any one of those accepted by `Buffer`. If `autoClose` is set to true (default behavior) on `'error'` or `'finish'` the file descriptor will be closed automatically. If `autoClose` is false, then the file descriptor won't be closed, even if there's an error. It is the application's responsibility to close it and make sure there's no file descriptor leak. By default, the stream will emit a `'close'` event after it has been destroyed. Set the `emitClose` option to `false` to change this behavior. By providing the `fs` option it is possible to override the corresponding `fs` implementations for `open`, `write`, `writev`, and `close`. Overriding `write()` without `writev()` can reduce performance as some optimizations (`_writev()`) will be disabled. When providing the `fs` option, overrides for at least one of `write` and `writev` are required. If no `fd` option is supplied, an override for `open` is also required. If `autoClose` is `true`, an override for `close` is also required. Like `fs.ReadStream`, if `fd` is specified, `fs.WriteStream` will ignore the `path` argument and will use the specified file descriptor. This means that no `'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s should be passed to `net.Socket`. If `options` is a string, then it specifies the encoding.
@sincev0.1.31
createWriteStream
('The.Matrix.1080p.mkv.gz')
); var console: Console
The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```
@see[source](https://github.com/nodejs/node/blob/v22.x/lib/console.js)
console
.Console.log(message?: any, ...optionalParams: any[]): void (+1 overload)
Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
@sincev0.1.100
log
('Pipeline succeeded');
} catch (function (local var) err: unknownerr) { var console: Console
The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```
@see[source](https://github.com/nodejs/node/blob/v22.x/lib/console.js)
console
.Console.error(message?: any, ...optionalParams: any[]): void (+1 overload)
Prints to `stderr` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const code = 5; console.error('error #%d', code); // Prints: error #5, to stderr console.error('error', code); // Prints: error 5, to stderr ``` If formatting elements (e.g. `%d`) are not found in the first string then [`util.inspect()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilinspectobject-options) is called on each argument and the resulting string values are concatenated. See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
@sincev0.1.100
error
('Pipeline failed', function (local var) err: unknownerr);
} }

Too Much Data, Too Quickly

There are instances where a Readable stream might give data to the Writable much too quickly — much more than the consumer can handle!

When that occurs, the consumer will begin to queue all the chunks of data for later consumption. The write queue will get longer and longer, and because of this more data must be kept in memory until the entire process has been completed.

Writing to a disk is a lot slower than reading from a disk, thus, when we are trying to compress a file and write it to our hard disk, backpressure will occur because the write disk will not be able to keep up with the speed from the read.

// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);

This is why a backpressure mechanism is important. If a backpressure system was not present, the process would use up your system's memory, effectively slowing down other processes, and monopolizing a large part of your system until completion.

This results in a few things:

  • Slowing down all other current processes
  • A very overworked garbage collector
  • Memory exhaustion

In the following examples, we will take out the return value of the .write() function and change it to true, which effectively disables backpressure support in Node.js core. In any reference to 'modified' binary, we are talking about running the node binary without the return ret; line, and instead with the replaced return true;.

Excess Drag on Garbage Collection

Let's take a look at a quick benchmark. Using the same example from above, we ran a few time trials to get a median time for both binaries.

   trial (#)  | `node` binary (ms) | modified `node` binary (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
average time: |      55299         |           55975

Both take around a minute to run, so there's not much of a difference at all, but let's take a closer look to confirm whether our suspicions are correct. We use the Linux tool dtrace to evaluate what's happening with the V8 garbage collector.

The GC (garbage collector) measured time indicates the intervals of a full cycle of a single sweep done by the garbage collector:

approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1

         *             *           *
         *             *           *
         *             *           *

      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

While the two processes start the same and seem to work the GC at the same rate, it becomes evident that after a few seconds with a properly working backpressure system in place, it spreads the GC load across consistent intervals of 4-8 milliseconds until the end of the data transfer.

However, when a backpressure system is not in place, the V8 garbage collection starts to drag out. The normal binary called the GC fires approximately 75 times in a minute, whereas, the modified binary fires only 36 times.

This is the slow and gradual debt accumulating from growing memory usage. As data gets transferred, without a backpressure system in place, more memory is being used for each chunk transfer.

The more memory that is being allocated, the more the GC has to take care of in one sweep. The bigger the sweep, the more the GC needs to decide what can be freed up, and scanning for detached pointers in a larger memory space will consume more computing power.

Memory Exhaustion

To determine the memory consumption of each binary, we've clocked each process with /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js individually.

This is the output on the normal binary:

Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

The maximum byte size occupied by virtual memory turns out to be approximately 87.81 mb.

And now changing the return value of the .write() function, we get:

Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

The maximum byte size occupied by virtual memory turns out to be approximately 1.52 gb.

Without streams in place to delegate the backpressure, there is an order of magnitude greater of memory space being allocated - a huge margin of difference between the same process!

This experiment shows how optimized and cost-effective Node.js' backpressure mechanism is for your computing system. Now, let's do a breakdown of how it works!

How Does Backpressure Resolve These Issues?

There are different functions to transfer data from one process to another. In Node.js, there is an internal built-in function called .pipe(). There are other packages out there you can use too! Ultimately though, at the basic level of this process, we have two separate components: the source of the data and the consumer.

When .pipe() is called from the source, it signals to the consumer that there is data to be transferred. The pipe function helps to set up the appropriate backpressure closures for the event triggers.

In Node.js the source is a Readable stream and the consumer is the Writable stream (both of these may be interchanged with a Duplex or a Transform stream, but that is out-of-scope for this guide).

The moment that backpressure is triggered can be narrowed exactly to the return value of a Writable's .write() function. This return value is determined by a few conditions, of course.

In any scenario where the data buffer has exceeded the highWaterMark or the write queue is currently busy, .write() will return false.

When a false value is returned, the backpressure system kicks in. It will pause the incoming Readable stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a 'drain' event will be emitted and resume the incoming data flow.

Once the queue is finished, backpressure will allow data to be sent again. The space in memory that was being used will free itself up and prepare for the next batch of data.

This effectively allows a fixed amount of memory to be used at any given time for a .pipe() function. There will be no memory leakage, and no infinite buffering, and the garbage collector will only have to deal with one area in memory!

So, if backpressure is so important, why have you (probably) not heard of it? Well, the answer is simple: Node.js does all of this automatically for you.

That's so great! But also not so great when we are trying to understand how to implement our custom streams.

In most machines, there is a byte size that determines when a buffer is full (which will vary across different machines). Node.js allows you to set your custom highWaterMark, but commonly, the default is set to 16kb (16384, or 16 for objectMode streams). In instances where you might want to raise that value, go for it, but do so with caution!

Lifecycle of .pipe()

To achieve a better understanding of backpressure, here is a flow-chart on the lifecycle of a Readable stream being piped into a Writable stream:

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

If you are setting up a pipeline to chain together a few streams to manipulate your data, you will most likely be implementing Transform stream.

In this case, your output from your Readable stream will enter in the Transform and will pipe into the Writable.

Readable.pipe(Transformable).pipe(Writable);

Backpressure will be automatically applied, but note that both the incoming and outgoing highWaterMark of the Transform stream may be manipulated and will affect the backpressure system.

Backpressure Guidelines

Since Node.js v0.10, the Stream class has offered the ability to modify the behavior of the .read() or .write() by using the underscore version of these respective functions (._read() and ._write()).

There are guidelines documented for implementing Readable streams and implementing Writable streams. We will assume you've read these over, and the next section will go a little bit more in-depth.

Rules to Abide By When Implementing Custom Streams

The golden rule of streams is to always respect backpressure. What constitutes as best practice is non-contradictory practice. So long as you are careful to avoid behaviors that conflict with internal backpressure support, you can be sure you're following good practice.

In general,

  1. Never .push() if you are not asked.
  2. Never call .write() after it returns false but wait for 'drain' instead.
  3. Streams changes between different Node.js versions, and the library you use. Be careful and test things.

In regards to point 3, an incredibly useful package for building browser streams is readable-stream. Rodd Vagg has written a great blog post describing the utility of this library. In short, it provides a type of automated graceful degradation for Readable streams, and supports older versions of browsers and Node.js.

Rules specific to Readable Streams

So far, we have taken a look at how .write() affects backpressure and have focused much on the Writable stream. Because of Node.js' functionality, data is technically flowing downstream from Readable to Writable. However, as we can observe in any transmission of data, matter, or energy, the source is just as important as the destination, and the Readable stream is vital to how backpressure is handled.

Both these processes rely on one another to communicate effectively, if the Readable ignores when the Writable stream asks for it to stop sending in data, it can be just as problematic as when the .write()'s return value is incorrect.

So, as well as respecting the .write() return, we must also respect the return value of .push() used in the ._read() method. If .push() returns a false value, the stream will stop reading from the source. Otherwise, it will continue without pause.

Here is an example of bad practice using .push():

// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class class MyReadableMyReadable extends Readable {
  MyReadable._read(size: any): void_read(size: anysize) {
    let let chunk: anychunk;
    while (null !== (let chunk: anychunk = getNextChunk())) {
      this.push(let chunk: anychunk);
    }
  }
}

Here is an example of good practice, where the Readable stream respects backpressure by checking the return value of this.push():

class class MyReadableMyReadable extends Readable {
  MyReadable._read(size: any): void_read(size: anysize) {
    let let chunk: anychunk;
    let let canPushMore: booleancanPushMore = true;
    while (let canPushMore: booleancanPushMore && null !== (let chunk: anychunk = getNextChunk())) {
      let canPushMore: booleancanPushMore = this.push(let chunk: anychunk);
    }
  }
}

Additionally, from outside the custom stream, there are pitfalls to ignoring backpressure. In this counter-example of good practice, the application's code forces data through whenever it is available (signaled by the 'data' event):

// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data', data: anydata => writable.write(data: anydata));

Here's an example of using .push() with a Readable stream.

const { class Readable
@sincev0.9.4
Readable
} =
var require: NodeJS.Require
(id: string) => any
Used to import modules, `JSON`, and local files.
@sincev0.1.13
require
('node:stream');
// Create a custom Readable stream const const myReadableStream: ReadablemyReadableStream = new new Readable(opts?: Stream.ReadableOptions): Readable
@sincev0.9.4
Readable
({
Stream.StreamOptions<Stream.Readable>.objectMode?: boolean | undefinedobjectMode: true, Stream.ReadableOptions<Stream.Readable>.read?(this: Readable, size: number): voidread(size: numbersize) { // Push some data onto the stream this.Stream.Readable.push(chunk: any, encoding?: BufferEncoding): booleanpush({ message: stringmessage: 'Hello, world!' }); this.Stream.Readable.push(chunk: any, encoding?: BufferEncoding): booleanpush(null); // Mark the end of the stream }, }); // Consume the stream const myReadableStream: ReadablemyReadableStream.Stream.Readable.on(event: "data", listener: (chunk: any) => void): Readable (+7 overloads)
Adds the `listener` function to the end of the listeners array for the event named `eventName`. No checks are made to see if the `listener` has already been added. Multiple calls passing the same combination of `eventName` and `listener` will result in the `listener` being added, and called, multiple times. ```js server.on('connection', (stream) => { console.log('someone connected!'); }); ``` Returns a reference to the `EventEmitter`, so that calls can be chained. By default, event listeners are invoked in the order they are added. The `emitter.prependListener()` method can be used as an alternative to add the event listener to the beginning of the listeners array. ```js import { EventEmitter } from 'node:events'; const myEE = new EventEmitter(); myEE.on('foo', () => console.log('a')); myEE.prependListener('foo', () => console.log('b')); myEE.emit('foo'); // Prints: // b // a ```
@sincev0.1.101@parameventName The name of the event.@paramlistener The callback function
on
('data', chunk: anychunk => {
var console: Console
The `console` module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers. The module exports two specific components: * A `Console` class with methods such as `console.log()`, `console.error()` and `console.warn()` that can be used to write to any Node.js stream. * A global `console` instance configured to write to [`process.stdout`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstdout) and [`process.stderr`](https://nodejs.org/docs/latest-v22.x/api/process.html#processstderr). The global `console` can be used without importing the `node:console` module. _**Warning**_: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the [`note on process I/O`](https://nodejs.org/docs/latest-v22.x/api/process.html#a-note-on-process-io) for more information. Example using the global `console`: ```js console.log('hello world'); // Prints: hello world, to stdout console.log('hello %s', 'world'); // Prints: hello world, to stdout console.error(new Error('Whoops, something bad happened')); // Prints error message and stack trace to stderr: // Error: Whoops, something bad happened // at [eval]:5:15 // at Script.runInThisContext (node:vm:132:18) // at Object.runInThisContext (node:vm:309:38) // at node:internal/process/execution:77:19 // at [eval]-wrapper:6:22 // at evalScript (node:internal/process/execution:76:60) // at node:internal/main/eval_string:23:3 const name = 'Will Robinson'; console.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to stderr ``` Example using the `Console` class: ```js const out = getStreamSomehow(); const err = getStreamSomehow(); const myConsole = new console.Console(out, err); myConsole.log('hello world'); // Prints: hello world, to out myConsole.log('hello %s', 'world'); // Prints: hello world, to out myConsole.error(new Error('Whoops, something bad happened')); // Prints: [Error: Whoops, something bad happened], to err const name = 'Will Robinson'; myConsole.warn(`Danger ${name}! Danger!`); // Prints: Danger Will Robinson! Danger!, to err ```
@see[source](https://github.com/nodejs/node/blob/v22.x/lib/console.js)
console
.Console.log(message?: any, ...optionalParams: any[]): void (+1 overload)
Prints to `stdout` with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to [`printf(3)`](http://man7.org/linux/man-pages/man3/printf.3.html) (the arguments are all passed to [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args)). ```js const count = 5; console.log('count: %d', count); // Prints: count: 5, to stdout console.log('count:', count); // Prints: count: 5, to stdout ``` See [`util.format()`](https://nodejs.org/docs/latest-v22.x/api/util.html#utilformatformat-args) for more information.
@sincev0.1.100
log
(chunk: anychunk);
}); // Output: // { message: 'Hello, world!' }

In this example, we create a custom Readable stream that pushes a single object onto the stream using .push(). The ._read() method is called when the stream is ready to consume data, and in this case, we immediately push some data onto the stream and mark the end of the stream by pushing null.

We then consume the stream by listening for the 'data' event and logging each chunk of data that is pushed onto the stream. In this case, we only push a single chunk of data onto the stream, so we only see one log message.

Rules specific to Writable Streams

Recall that a .write() may return true or false dependent on some conditions. Luckily for us, when building our own Writable stream, the stream state machine will handle our callbacks and determine when to handle backpressure and optimize the flow of data for us.

However, when we want to use a Writable directly, we must respect the .write() return value and pay close attention to these conditions:

  • If the write queue is busy, .write() will return false.
  • If the data chunk is too large, .write() will return false (the limit is indicated by the variable, highWaterMark).
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class class MyWritableMyWritable extends Writable {
  MyWritable._write(chunk: any, encoding: any, callback: any): void_write(chunk: anychunk, encoding: anyencoding, callback: anycallback) {
    if (chunk: anychunk.toString().indexOf('a') >= 0) {
      callback: anycallback();
    } else if (chunk: anychunk.toString().indexOf('b') >= 0) {
      callback: anycallback();
    }
    callback: anycallback();
  }
}

// The proper way to write this would be:
if (chunk.contains('a')) {
  return callback();
}

if (chunk.contains('b')) {
  return callback();
}
callback();

There are also some things to look out for when implementing ._writev(). The function is coupled with .cork(), but there is a common mistake when writing:

// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();

ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();

// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
var process: NodeJS.Processprocess.NodeJS.Process.nextTick(callback: Function, ...args: any[]): void
`process.nextTick()` adds `callback` to the "next tick queue". This queue is fully drained after the current operation on the JavaScript stack runs to completion and before the event loop is allowed to continue. It's possible to create an infinite loop if one were to recursively call `process.nextTick()`. See the [Event Loop](https://nodejs.org/en/docs/guides/event-loop-timers-and-nexttick/#process-nexttick) guide for more background. ```js import { nextTick } from 'node:process'; console.log('start'); nextTick(() => { console.log('nextTick callback'); }); console.log('scheduled'); // Output: // start // scheduled // nextTick callback ``` This is important when developing APIs in order to give users the opportunity to assign event handlers _after_ an object has been constructed but before any I/O has occurred: ```js import { nextTick } from 'node:process'; function MyThing(options) { this.setupOptions(options); nextTick(() => { this.startDoingStuff(); }); } const thing = new MyThing(); thing.getReadyForStuff(); // thing.startDoingStuff() gets called now, not before. ``` It is very important for APIs to be either 100% synchronous or 100% asynchronous. Consider this example: ```js // WARNING! DO NOT USE! BAD UNSAFE HAZARD! function maybeSync(arg, cb) { if (arg) { cb(); return; } fs.stat('file', cb); } ``` This API is hazardous because in the following case: ```js const maybeTrue = Math.random() > 0.5; maybeSync(maybeTrue, () => { foo(); }); bar(); ``` It is not clear whether `foo()` or `bar()` will be called first. The following approach is much better: ```js import { nextTick } from 'node:process'; function definitelyAsync(arg, cb) { if (arg) { nextTick(cb); return; } fs.stat('file', cb); } ```
@sincev0.1.26@paramargs Additional arguments to pass when invoking the `callback`
nextTick
(function doUncork(stream: any): voiddoUncork, ws);
ws.cork(); ws.write('from '); ws.write('Matteo'); var process: NodeJS.Processprocess.NodeJS.Process.nextTick(callback: Function, ...args: any[]): void
`process.nextTick()` adds `callback` to the "next tick queue". This queue is fully drained after the current operation on the JavaScript stack runs to completion and before the event loop is allowed to continue. It's possible to create an infinite loop if one were to recursively call `process.nextTick()`. See the [Event Loop](https://nodejs.org/en/docs/guides/event-loop-timers-and-nexttick/#process-nexttick) guide for more background. ```js import { nextTick } from 'node:process'; console.log('start'); nextTick(() => { console.log('nextTick callback'); }); console.log('scheduled'); // Output: // start // scheduled // nextTick callback ``` This is important when developing APIs in order to give users the opportunity to assign event handlers _after_ an object has been constructed but before any I/O has occurred: ```js import { nextTick } from 'node:process'; function MyThing(options) { this.setupOptions(options); nextTick(() => { this.startDoingStuff(); }); } const thing = new MyThing(); thing.getReadyForStuff(); // thing.startDoingStuff() gets called now, not before. ``` It is very important for APIs to be either 100% synchronous or 100% asynchronous. Consider this example: ```js // WARNING! DO NOT USE! BAD UNSAFE HAZARD! function maybeSync(arg, cb) { if (arg) { cb(); return; } fs.stat('file', cb); } ``` This API is hazardous because in the following case: ```js const maybeTrue = Math.random() > 0.5; maybeSync(maybeTrue, () => { foo(); }); bar(); ``` It is not clear whether `foo()` or `bar()` will be called first. The following approach is much better: ```js import { nextTick } from 'node:process'; function definitelyAsync(arg, cb) { if (arg) { nextTick(cb); return; } fs.stat('file', cb); } ```
@sincev0.1.26@paramargs Additional arguments to pass when invoking the `callback`
nextTick
(function doUncork(stream: any): voiddoUncork, ws);
// As a global function. function function doUncork(stream: any): voiddoUncork(stream: anystream) { stream: anystream.uncork(); }

.cork() can be called as many times as we want, we just need to be careful to call .uncork() the same amount of times to make it flow again.

Conclusion

Streams are an often-used module in Node.js. They are important to the internal structure, and for developers, to expand and connect across the Node.js modules ecosystem.

Hopefully, you will now be able to troubleshoot and safely code your own Writable and Readable streams with backpressure in mind, and share your knowledge with colleagues and friends.

Be sure to read up more on Stream for other API functions to help improve and unleash your streaming capabilities when building an application with Node.js.