If you've ever worked with large files, network sockets, or real-time data processing in Node.js, you've probably come across Streams. But what exactly are Readable
, Writable
, and Transform
streams? And how do you create custom ones?
This post will simplify Node.js streams and show you how to create your own from scratch 💡
📦 What Are Streams in Node.js?
Streams are a way to handle reading/writing data efficiently—especially large amounts of it—without loading everything into memory at once.
Memory Efficiency Example 🧠
Imagine processing a 1GB file:
// ❌ Without streams (loads entire file into memory)
const data = fs.readFileSync('large-file.csv');
processData(data);
// ✅ With streams (processes chunks at a time)
fs.createReadStream('large-file.csv')
.pipe(processData);
Node.js provides 4 types of streams:
- Readable – You can read data from it.
- Writable – You can write data to it.
- Duplex – Both readable and writable (like a socket).
- Transform – Duplex stream that modifies the data.
🧪 Let's Build Some Streams!
We'll build each stream step-by-step:
🟢 Custom Readable Stream
const { Readable } = require('stream');
class MyReadable extends Readable {
constructor(options) {
super(options);
this.currentCharCode = 65; // ASCII A
}
_read(size) {
if (this.currentCharCode > 90) {
this.push(null); // end of stream
} else {
this.push(String.fromCharCode(this.currentCharCode++));
}
}
}
const readable = new MyReadable();
readable.on('data', chunk => {
console.log(`Received: ${chunk}`);
});
🧠 Explanation:
- We start from character
'A'
and push till'Z'
-
this.push(null)
tells Node we're done
🟡 Custom Writable Stream
const { Writable } = require('stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
console.log(`Writing: ${chunk.toString()}`);
callback(); // indicate ready for next write
}
}
const writable = new MyWritable();
writable.write('Hello ');
writable.write('World!');
writable.end();
🧠 Explanation:
- Every chunk passed to
write()
gets printed -
callback()
is important – it tells Node we're done writing the current chunk
🟣 Custom Transform Stream
const { Transform } = require('stream');
class MyTransform extends Transform {
_transform(chunk, encoding, callback) {
const upper = chunk.toString().toUpperCase();
this.push(upper);
callback();
}
}
const transform = new MyTransform();
process.stdin.pipe(transform).pipe(process.stdout);
🧠 Explanation:
- We take input from terminal, transform it to uppercase, and pipe it to output
Try running this in terminal and type something — it'll be converted on the fly!
🛠 Real World Use Case: File Piping with Transform
const fs = require('fs');
const { Transform } = require('stream');
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
const transformStream = new Transform({
transform(chunk, enc, cb) {
const updated = chunk.toString().replace(/foo/g, 'bar');
cb(null, updated);
}
});
readStream.pipe(transformStream).pipe(writeStream);
👆 This replaces "foo" with "bar" while streaming data between files.
🚀 Advanced Stream Features
🔄 Stream Pipeline (Better Error Handling)
The pipeline
function properly handles errors and resource cleanup:
const { pipeline } = require('stream');
const fs = require('fs');
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
pipeline(
readStream,
transformStream,
writeStream,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded!');
}
}
);
🔄 Error Handling with Streams
Always handle errors in your streams to prevent crashes:
readStream.on('error', (err) => {
console.error('Read error:', err);
});
writeStream.on('error', (err) => {
console.error('Write error:', err);
});
🔄 Object Mode Streams
Streams can process objects instead of just buffers/strings:
const { Transform } = require('stream');
const objectTransform = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
// Process objects instead of buffers
const transformedObject = {
...chunk,
processed: true,
timestamp: Date.now()
};
this.push(transformedObject);
callback();
}
});
🔄 Async Iteration with Streams (Modern JS)
ES2018 introduced a cleaner way to consume streams:
const fs = require('fs');
async function processFile() {
const stream = fs.createReadStream('data.txt');
// Process each chunk using for-await-of
for await (const chunk of stream) {
console.log('Chunk:', chunk.toString());
}
console.log('Stream processing complete!');
}
processFile().catch(console.error);
🔄 Understanding Backpressure
Backpressure happens when data is produced faster than it can be consumed.
Stream's pipe handles this automatically:
// ✅ Handles backpressure (waits if destination is slow)
sourceStream.pipe(destinationStream);
// ❌ Ignores backpressure (could overwhelm destination)
sourceStream.on('data', (chunk) => {
destinationStream.write(chunk); // No flow control!
});
🧵 TL;DR
- Readable Stream: You push data out.
- Writable Stream: You consume data being written.
- Transform Stream: You take input, modify it, and push it forward.
- Pipeline: Better error management and resource cleanup.
- Backpressure: Automatic flow control between fast producers and slow consumers.
💡 Tips
- Always call
callback()
in_write()
or_transform()
- Use
.push(null)
to signal end of readable stream - Prefer streams when working with large data (files, network, etc.)
- Don't forget to handle errors
- Use
pipeline()
for safer piping
📚 One More Example?
- One final version of custom Readable, Writable and Transform Stream
const { pipeline } = require('stream');
const { Readable, Writable, Transform } = require('stream');
class MyReadable extends Readable {
constructor(options) {
super(options);
this.currentCharCode = 65; // ASCII A
}
_read(size) {
if (this.currentCharCode > 90) {
this.push(null); // end of stream
} else {
this.push(String.fromCharCode(this.currentCharCode++));
}
}
}
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
console.log(`Writing: ${chunk.toString()}`);
callback(); // indicate ready for next write
}
}
class MyTransform extends Transform {
_transform(chunk, encoding, callback) {
const upper = chunk.toString().toUpperCase();
this.push(upper);
callback();
}
}
const readable = new MyReadable();
const writable = new MyWritable();
const transform = new MyTransform();
const { pipeline } = require('stream');
pipeline(
readable,
transform,
writable,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
Got questions? Drop them below! 🚀