14 min read

(For more resources related to this topic, see here.)

According to Bjarne Stoustrup in his book The C++ Programming Language, Third Edition:

Designing and implementing a general input/output facility for a programming language is notoriously difficult… An I/O facility should be easy, convenient, and safe to use; efficient and flexible; and, above all, complete.

It shouldn’t surprise anyone that a design team, focused on providing efficient and easy I/O, has delivered such a facility through Node. Through a symmetrical and simple interface, which handles data buffers and stream events so that the implementer does not have to, Node’s Stream module is the preferred way to manage asynchronous data streams for both internal modules and, hopefully, for the modules developers will create.

A stream in Node is simply a sequence of bytes. At any time, a stream contains a buffer of bytes, and this buffer has a zero or greater length:

Because each character in a stream is well defined, and because every type of digital data can be expressed in bytes, any part of a stream can be redirected, or “piped”, to any other stream, different chunks of the stream can be sent do different handlers, and so on. In this way stream input and output interfaces are both flexible and predictable and can be easily coupled.

Digital streams are well described using the analogy of fluids, where individual bytes (drops of water) are being pushed through a pipe. In Node, streams are objects representing data flows that can be written to and read from asynchronously. The Node philosophy is a non-blocking flow, I/O is handled via streams, and so the design of the Stream API naturally duplicates this general philosophy. In fact, there is no other way of interacting with streams except in an asynchronous, evented manner—you are prevented, by design, from blocking I/O.

Five distinct base classes are exposed via the abstract Stream interface: Readable, Writable, Duplex, Transform, and PassThrough. Each base class inherits from EventEmitter, which we know of as an interface to which event listeners and emitters can be bound.

As we will learn, and here will emphasize, the Stream interface is an abstract interface. An abstract interface functions as a kind of blueprint or definition, describing the features that must be built into each constructed instance of a Stream object. For example, a readable stream implementation is required to implement a public read method which delegates to the interface’s internal _read method.

In general, all stream implementations should follow these guidelines:

  • As long as data exists to send, write to a stream until that operation returns false, at which point the implementation should wait for a drain event, indicating that the buffered stream data has emptied
  • Continue to call read until a null value is received, at which point wait for a readable event prior to resuming reads
  • Several Node I/O modules are implemented as streams. Network sockets, file readers and writers, stdin and stdout, zlib, and so on. Similarly, when implementing a readable data source, or data reader, one should implement that interface as a Stream interface.

It is important to note that as of Node 0.10.0 the Stream interface changed in some fundamental ways. The Node team has done its best to implement backwards-compatible interfaces, such that (most) older programs will continue to function without modification. In this article we will not spend any time discussing the specific features of this older API, focusing on the current (and future) design. The reader is encouraged to consult Node’s online documentation for information on migrating older programs.

Implementing readable streams

Streams producing data that another process may have an interest in are normally implemented using a Readable stream. A Readable stream saves the implementer all the work of managing the read queue, handling the emitting of data events, and so on.

To create a Readable stream:

var stream = require(‘stream’); var readable = new stream.Readable({ encoding : “utf8”, highWaterMark : 16000, objectMode: true });

As previously mentioned, Readable is exposed as a base class, which can be initialized through three options:

  • encoding: Decode buffers into the specified encoding, defaulting to UTF-8.
  • highWaterMark: Number of bytes to keep in the internal buffer before ceasing to read from the data source. The default is 16 KB.
  • objectMode: Tell the stream to behave as a stream of objects instead of a stream of bytes, such as a stream of JSON objects instead of the bytes in a file. Default false.

In the following example we create a mock Feed object whose instances will inherit the Readable stream interface. Our implementation need only implement the abstract _read method of Readable, which will push data to a consumer until there is nothing more to push, at which point it triggers the Readable stream to emit an “end” event by pushing a null value:

var Feed = function(channel) { var readable = new stream.Readable({ encoding : “utf8” }); var news = [ “Big Win!”, “Stocks Down!”, “Actor Sad!” ]; readable._read = function() { if(news.length) { return readable.push(news.shift() + “n”); } readable.push(null); }; return readable; }

Now that we have an implementation, a consumer might want to instantiate the stream and listen for stream events. Two key events are readable and end.

The readable event is emitted as long as data is being pushed to the stream. It alerts the consumer to check for new data via the read method of Readable.

Note again how the Readable implementation must provide a private _read method, which services the public read method exposed to the consumer API.

The end event will be emitted whenever a null value is passed to the push method of our Readable implementation.

Here we see a consumer using these methods to display new stream data, providing a notification when the stream has stopped sending data:

var feed = new Feed(); feed.on(“readable”, function() { var data = feed.read(); data && process.stdout.write(data); }); feed.on(“end”, function() { console.log(“No more news”); });

Similarly, we could implement a stream of objects through the use of the objectMode option:

var readable = new stream.Readable({ objectMode : true }); var prices = [ { price : 1 }, { price : 2 } ]; … readable.push(prices.shift()); // } { prices : 1 } // } { prices : 2 }

Here we see that each read event is receiving an object, rather than a buffer or string.

Finally, the read method of a Readable stream can be passed a single argument indicating the number of bytes to be read from the stream’s internal buffer. For example, if it was desired that a file should be read one byte at a time, one might implement a consumer using a routine similar to:

readable.push(“Sequence of bytes”); … feed.on(“readable”, function() { var character; while(character = feed.read(1)) { console.log(character); }; }); // } S // } e // } q // } …

Here it should be clear that the Readable stream’s buffer was filled with a number of bytes all at once, but was read from discretely.

Pushing and pulling

We have seen how a Readable implementation will use push to populate the stream buffer for reading. When designing these implementations it is important to consider how volume is managed, at either end of the stream. Pushing more data into a stream than can be read can lead to complications around exceeding available space (memory). At the consumer end it is important to maintain awareness of termination events, and how to deal with pauses in the data stream.

One might compare the behavior of data streams running through a network with that of water running through a hose.

As with water through a hose, if a greater volume of data is being pushed into the read stream than can be efficiently drained out of the stream at the consumer end through read, a great deal of back pressure builds, causing a data backlog to begin accumulating in the stream object’s buffer. Because we are dealing with strict mathematical limitations, read simply cannot be compelled to release this pressure by reading more quickly—there may be a hard limit on available memory space, or other limitation. As such, memory usage can grow dangerously high, buffers can overflow, and so forth.

A stream implementation should therefore be aware of, and respond to, the response from a push operation. If the operation returns false this indicates that the implementation should cease reading from its source (and cease pushing) until the next _read request is made.

In conjunction with the above, if there is no more data to push but more is expected in the future the implementation should push an empty string (“”), which adds no data to the queue but does ensure a future readable event.

While the most common treatment of a stream buffer is to push to it (queuing data in a line), there are occasions where one might want to place data on the front of the buffer (jumping the line). Node provides an unshift operation for these cases, which behavior is identical to push, outside of the aforementioned difference in buffer placement.

Writable streams

A Writable stream is responsible for accepting some value (a stream of bytes, a string) and writing that data to a destination. Streaming data into a file container is a common use case.

To create a Writable stream:

var stream = require(‘stream’); var readable = new stream.Writable({ highWaterMark : 16000, decodeStrings: true });

The Writable streams constructor can be instantiated with two options:

  • highWaterMark: The maximum number of bytes the stream’s buffer will accept prior to returning false on writes. Default is 16 KB
  • decodeStrings: Whether to convert strings into buffers before writing. Default is true.

As with Readable streams, custom Writable stream implementations must implement a _write handler, which will be passed the arguments sent to the write method of instances.

One should think of a Writable stream as a data target, such as for a file you are uploading. Conceptually this is not unlike the implementation of push in a Readable stream, where one pushes data until the data source is exhausted, passing null to terminate reading. For example, here we write 100 bytes to stdout:

var stream = require(‘stream’); var writable = new stream.Writable({ decodeStrings: false }); writable._write = function(chunk, encoding, callback) { console.log(chunk); callback(); } var w = writable.write(new Buffer(100)); writable.end(); console.log(w); // Will be `true`

There are two key things to note here.

First, our _write implementation fires the callback function immediately after writing, a callback that is always present, regardless of whether the instance write method is passed a callback directly. This call is important for indicating the status of the write attempt, whether a failure (error) or a success.

Second, the call to write returned true. This indicates that the internal buffer of the Writable implementation has been emptied after executing the requested write. What if we sent a very large amount of data, enough to exceed the default size of the internal buffer? Modifying the above example, the following would return false:

var w = writable.write(new Buffer(16384)); console.log(w); // Will be ‘false’

The reason this write returns false is that it has reached the highWaterMark option—default value of 16 KB (16 * 1024). If we changed this value to 16383, write would again return true (or one could simply increase its value).

What to do when write returns false? One should certainly not continue to send data! Returning to our metaphor of water in a hose: when the stream is full, one should wait for it to drain prior to sending more data. Node’s Stream implementation will emit a drain event whenever it is safe to write again. When write returns false listen for the drain event before sending more data.

Putting together what we have learned, let’s create a Writable stream with a highWaterMark value of 10 bytes. We will send a buffer containing more than 10 bytes (composed of A characters) to this stream, triggering a drain event, at which point we write a single Z character. It should be clear from this example that Node’s Stream implementation is managing the buffer overflow of our original payload, warning the original write method of this overflow, performing a controlled depletion of the internal buffer, and notifying us when it is safe to write again:

var stream = require(‘stream’); var writable = new stream.Writable({ highWaterMark: 10 }); writable._write = function(chunk, encoding, callback) { process.stdout.write(chunk); callback(); } writable.on(“drain”, function() { writable.write(“Zn”); }); var buf = new Buffer(20, “utf8”); buf.fill(“A”); console.log(writable.write(buf.toString())); // false

The result should be a string of 20 A characters, followed by false, then followed by the character Z.

The fluid data in a Readable stream can be easily redirected to a Writable stream. For example, the following code will take any data sent by a terminal (stdin is a Readable stream) and pass it to the destination Writable stream, stdout:

process.stdin.pipe(process.stdout);

Whenever a Writable stream is passed to a Readable stream’s pipe method, a pipe event will fire. Similarly, when a Writable stream is removed as a destination for a Readable stream, the unpipe event fires.

To remove a pipe, use the following:

unpipe(destination stream)

 

Duplex streams

A duplex stream is both readable and writeable. For instance, a TCP server created in Node exposes a socket that can be both read from and written to:

var stream = require(“stream”); var net = require(“net”); net .createServer(function(socket) { socket.write(“Go ahead and type something!”); socket.on(“readable”, function() { process.stdout.write(this.read()) }); }) .listen(8080);

When executed, this code will create a TCP server that can be connected to via Telnet:

telnet 127.0.0.1 8080

Upon connection, the connecting terminal will print out Go ahead and type something! —writing to the socket. Any text entered in the connecting terminal will be echoed to the stdout of the terminal running the TCP server (reading from the socket). This implementation of a bi-directional (duplex) communication protocol demonstrates clearly how independent processes can form the nodes of a complex and responsive application, whether communicating across a network or within the scope of a single process.

The options sent when constructing a Duplex instance merge those sent to Readable and Writable streams, with no additional parameters. Indeed, this stream type simply assumes both roles, and the rules for interacting with it follow the rules for the interactive mode being used.

As a Duplex stream assumes both read and write roles, any implementation is required to implement both _write and _read methods, again following the standard implementation details given for the relevant stream type.

Transforming streams

On occasion stream data needs to be processed, often in cases where one is writing some sort of binary protocol or other “on the fly” data transformation. A Transform stream is designed for this purpose, functioning as a Duplex stream that sits between a Readable stream and a Writable stream.

A Transform stream is initialized using the same options used to initialize a typical Duplex stream. Where Transform differs from a normal Duplex stream is in its requirement that the custom implementation merely provide a _transform method, excluding the _write and _read method requirement.

The _transform method will receive three arguments, first the sent buffer, an optional encoding argument, and finally a callback which _transform is expected to call when the transformation is complete:

_transform = function(buffer, encoding, cb) { var transformation = “…”; this.push(transformation) cb(); }

Let’s imagine a program that wishes to convert ASCII (American Standard Code for Information Interchange) codes into ASCII characters, receiving input from stdin. We would simply pipe our input to a Transform stream, then piping its output to stdout:

var stream = require(‘stream’); var converter = new stream.Transform(); converter._transform = function(num, encoding, cb) { this.push(String.fromCharCode(new Number(num)) + “n”) cb(); } process.stdin.pipe(converter).pipe(process.stdout);

Interacting with this program might produce an output resembling the following:

65 A 66 B 256 A 257 a

 

Using PassThrough streams

This sort of stream is a trivial implementation of a Transform stream, which simply passes received input bytes through to an output stream. This is useful if one doesn’t require any transformation of the input data, and simply wants to easily pipe a Readable stream to a Writable stream.

PassThrough streams have benefits similar to JavaScript’s anonymous functions, making it easy to assert minimal functionality without too much fuss. For example, it is not necessary to implement an abstract base class, as one does with for the _read method of a Readable stream. Consider the following use of a PassThrough stream as an event spy:

var fs = require(‘fs’); var stream = new require(‘stream’).PassThrough(); spy.on(‘end’, function() { console.log(“All data has been sent”); }); fs.createReadStream(“./passthrough.js”).pipe(spy).pipe(process.std out);

Summary

As we have learned, Node’s designers have succeeded in creating a simple, predictable, and convenient solution to the very difficult problem of enabling efficient I/O between disparate sources and targets. Its abstract Stream interface facilitates the instantiation of consistent readable and writable interfaces, and the extension of this interface into HTTP requests and responses, the filesystem, child processes, and other data channels makes stream programming with Node a pleasant experience.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here