If you've ever worked with large files, network sockets, or real-time data processing in Node.js, you've probably come across Streams. But what exactly are Readable, Writable, and Transform streams? And how do you create custom ones?

This post will simplify Node.js streams and show you how to create your own from scratch 💡


📦 What Are Streams in Node.js?

Streams are a way to handle reading/writing data efficiently—especially large amounts of it—without loading everything into memory at once.

Memory Efficiency Example 🧠

Imagine processing a 1GB file:

// ❌ Without streams (loads entire file into memory)
const data = fs.readFileSync('large-file.csv');
processData(data);

// ✅ With streams (processes chunks at a time)
fs.createReadStream('large-file.csv')
  .pipe(processData);

Node.js provides 4 types of streams:

  • Readable – You can read data from it.
  • Writable – You can write data to it.
  • Duplex – Both readable and writable (like a socket).
  • Transform – Duplex stream that modifies the data.

🧪 Let's Build Some Streams!

We'll build each stream step-by-step:


🟢 Custom Readable Stream

const { Readable } = require('stream');

class MyReadable extends Readable {
  constructor(options) {
    super(options);
    this.currentCharCode = 65; // ASCII A
  }

  _read(size) {
    if (this.currentCharCode > 90) {
      this.push(null); // end of stream
    } else {
      this.push(String.fromCharCode(this.currentCharCode++));
    }
  }
}

const readable = new MyReadable();
readable.on('data', chunk => {
  console.log(`Received: ${chunk}`);
});

🧠 Explanation:

  • We start from character 'A' and push till 'Z'
  • this.push(null) tells Node we're done

🟡 Custom Writable Stream

const { Writable } = require('stream');

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    console.log(`Writing: ${chunk.toString()}`);
    callback(); // indicate ready for next write
  }
}

const writable = new MyWritable();
writable.write('Hello ');
writable.write('World!');
writable.end();

🧠 Explanation:

  • Every chunk passed to write() gets printed
  • callback() is important – it tells Node we're done writing the current chunk

🟣 Custom Transform Stream

const { Transform } = require('stream');

class MyTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const upper = chunk.toString().toUpperCase();
    this.push(upper);
    callback();
  }
}

const transform = new MyTransform();

process.stdin.pipe(transform).pipe(process.stdout);

🧠 Explanation:

  • We take input from terminal, transform it to uppercase, and pipe it to output

Try running this in terminal and type something — it'll be converted on the fly!


🛠 Real World Use Case: File Piping with Transform

const fs = require('fs');
const { Transform } = require('stream');

const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');

const transformStream = new Transform({
  transform(chunk, enc, cb) {
    const updated = chunk.toString().replace(/foo/g, 'bar');
    cb(null, updated);
  }
});

readStream.pipe(transformStream).pipe(writeStream);

👆 This replaces "foo" with "bar" while streaming data between files.


🚀 Advanced Stream Features

🔄 Stream Pipeline (Better Error Handling)

The pipeline function properly handles errors and resource cleanup:

const { pipeline } = require('stream');
const fs = require('fs');

const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');

pipeline(
  readStream,
  transformStream,
  writeStream,
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded!');
    }
  }
);

🔄 Error Handling with Streams

Always handle errors in your streams to prevent crashes:

readStream.on('error', (err) => {
  console.error('Read error:', err);
});

writeStream.on('error', (err) => {
  console.error('Write error:', err);
});

🔄 Object Mode Streams

Streams can process objects instead of just buffers/strings:

const { Transform } = require('stream');

const objectTransform = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    // Process objects instead of buffers
    const transformedObject = { 
      ...chunk, 
      processed: true,
      timestamp: Date.now()
    };
    this.push(transformedObject);
    callback();
  }
});

🔄 Async Iteration with Streams (Modern JS)

ES2018 introduced a cleaner way to consume streams:

const fs = require('fs');

async function processFile() {
  const stream = fs.createReadStream('data.txt');

  // Process each chunk using for-await-of
  for await (const chunk of stream) {
    console.log('Chunk:', chunk.toString());
  }

  console.log('Stream processing complete!');
}

processFile().catch(console.error);

🔄 Understanding Backpressure

Backpressure happens when data is produced faster than it can be consumed.
Stream's pipe handles this automatically:

// ✅ Handles backpressure (waits if destination is slow)
sourceStream.pipe(destinationStream);

// ❌ Ignores backpressure (could overwhelm destination)
sourceStream.on('data', (chunk) => {
  destinationStream.write(chunk); // No flow control!
});

🧵 TL;DR

  • Readable Stream: You push data out.
  • Writable Stream: You consume data being written.
  • Transform Stream: You take input, modify it, and push it forward.
  • Pipeline: Better error management and resource cleanup.
  • Backpressure: Automatic flow control between fast producers and slow consumers.

💡 Tips

  • Always call callback() in _write() or _transform()
  • Use .push(null) to signal end of readable stream
  • Prefer streams when working with large data (files, network, etc.)
  • Don't forget to handle errors
  • Use pipeline() for safer piping

📚 One More Example?

  • One final version of custom Readable, Writable and Transform Stream
const { pipeline } = require('stream');

const { Readable, Writable, Transform } = require('stream');

class MyReadable extends Readable {
  constructor(options) {
    super(options);
    this.currentCharCode = 65; // ASCII A
  }

  _read(size) {
    if (this.currentCharCode > 90) {
      this.push(null); // end of stream
    } else {
      this.push(String.fromCharCode(this.currentCharCode++));
    }
  }
}

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    console.log(`Writing: ${chunk.toString()}`);
    callback(); // indicate ready for next write
  }
}

class MyTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const upper = chunk.toString().toUpperCase();
    this.push(upper);
    callback();
  }
}

const readable = new MyReadable();
const writable = new MyWritable();
const transform = new MyTransform();

const { pipeline } = require('stream');

pipeline(
  readable,
  transform,
  writable,
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Got questions? Drop them below! 🚀