Creating Custom Streams Β· Astro Tech Blog

Why Create Custom Streams?

Built-in streams (fs, http, zlib, crypto) cover most needs, but sometimes you need:

  • A custom data source (generate data dynamically, read from a database cursor)
  • A custom data destination (write to a database, send to a logging service)
  • A custom transformation (parse CSV, convert formats, filter data)

Creating a custom stream involves implementing a few internal methods that Node.js calls automatically.

Internal Methods You Must Implement

Stream TypeInternal MethodPurpose
Readable_read(size)Push data into the stream
Writable_write(chunk, encoding, callback)Process incoming data
Writable_final(callback)Called before stream ends (optional)
Duplex_read(size) + _write(chunk, enc, callback)Both sides
Transform_transform(chunk, encoding, callback)Transform incoming data
Transform_flush(callback)Flush remaining data before end (optional)

1. Custom Readable Stream

A custom Readable generates data by calling this.push(data). Push null to signal the end.

Example: Random Number Generator

const { Readable } = require('stream');

class RandomNumberStream extends Readable {
  constructor(options = {}) {
    super(options);
    this.maxNumbers = options.maxNumbers || 10;
    this.count = 0;
  }

  // _read is called when the consumer wants data
  _read(size) {
    console.log(`_read called, count: ${this.count}/${this.maxNumbers}`);

    if (this.count >= this.maxNumbers) {
      // Signal end of stream
      this.push(null);
      return;
    }

    const number = Math.floor(Math.random() * 100);
    this.count++;

    // Push a Buffer (streams work with Buffers by default)
    this.push(Buffer.from(number.toString() + '\n'));
  }
}

// Usage
const randomStream = new RandomNumberStream({ maxNumbers: 5 });

randomStream.on('data', (chunk) => {
  console.log('Received:', chunk.toString().trim());
});

randomStream.on('end', () => {
  console.log('No more random numbers');
});

// Or pipe it:
// randomStream.pipe(process.stdout);

When _read Is Called

Node.js calls _read() when:

  1. A 'data' listener is attached (flowing mode)
  2. stream.resume() is called
  3. stream.pipe(dest) is called
  4. The internal buffer is below highWaterMark

You must call this.push(data) inside _read. If you don’t push anything, the stream will keep calling _read (up to highWaterMark bytes).

Example: Reading from a Database Cursor

const { Readable } = require('stream');

class DatabaseQueryStream extends Readable {
  constructor(db, query, options = {}) {
    super({ objectMode: true, ...options }); // objectMode for non-Buffer data
    this.cursor = db.query(query).cursor();
  }

  async _read(size) {
    try {
      const rows = await this.cursor.fetch(size);

      for (const row of rows) {
        this.push(row); // Push objects, not Buffers
      }

      if (rows.length === 0) {
        this.push(null); // No more rows
        await this.cursor.close();
      }
    } catch (err) {
      this.destroy(err);
    }
  }
}

// Usage
const userStream = new DatabaseQueryStream(db, 'SELECT * FROM users');

userStream.on('data', (user) => {
  console.log('User:', user.name, user.email);
});

objectMode: By default, streams work with Buffers. Set objectMode: true to push arbitrary JavaScript objects.

2. Custom Writable Stream

A custom Writable processes data by implementing _write(chunk, encoding, callback).

Example: Line-by-Line File Writer with Timestamps

const { Writable } = require('stream');

class TimestampedFileWriter extends Writable {
  constructor(filePath, options = {}) {
    super(options);
    this.fs = require('fs');
    this.stream = this.fs.createWriteStream(filePath, { flags: 'a' });
    this.lineCount = 0;
  }

  // encoding is 'buffer' by default, or 'utf8' if decoding is enabled
  _write(chunk, encoding, callback) {
    const line = chunk.toString().trim();
    this.lineCount++;

    // Add timestamp prefix
    const timestamp = new Date().toISOString();
    const output = `[${timestamp}] Line ${this.lineCount}: ${line}\n`;

    // Write to file
    this.stream.write(output, callback);
  }

  // Called after the last _write, before finish
  _final(callback) {
    console.log(`Total lines written: ${this.lineCount}`);
    this.stream.end(callback);
  }
}

// Usage
const writer = new TimestampedFileWriter('output.log');

writer.write('First entry');
writer.write('Second entry');
writer.end('Last entry');

writer.on('finish', () => {
  console.log('All data written');
});

Understanding callback in _write

The callback is critical. It tells the writable stream that you’ve finished processing the chunk:

_write(chunk, encoding, callback) {
  doSomethingAsync(chunk, (err) => {
    if (err) {
      callback(err);  // Propagate error β€” stream will emit 'error'
    } else {
      callback();     // Signal: ready for next chunk
    }
  });
}

If you never call callback(), the stream hangs forever. No 'drain', no 'finish', nothing.

3. Custom Transform Stream

Transform streams modify data as it passes through. Implement _transform(chunk, encoding, callback).

Example: CSV to JSON Converter

const { Transform } = require('stream');

class CsvToJsonTransform extends Transform {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.headers = null;
  }

  _transform(chunk, encoding, callback) {
    const line = chunk.toString().trim();
    if (!line) return callback();

    const values = line.split(',');

    if (!this.headers) {
      // First line is the header
      this.headers = values.map(h => h.trim().toLowerCase());
      callback();
      return;
    }

    // Convert to object
    const obj = {};
    this.headers.forEach((header, i) => {
      obj[header] = values[i]?.trim();
    });

    // Push the transformed object
    this.push(obj);
    callback();
  }

  _flush(callback) {
    console.log('CSV parsing complete');
    callback();
  }
}

// Usage
const fs = require('fs');
const { pipeline } = require('stream');

pipeline(
  fs.createReadStream('data.csv'),
  new CsvToJsonTransform(),
  new ConcatJsonTransform(), // Another custom transform
  (err) => {
    if (err) console.error('Pipeline error:', err.message);
  },
);

Wait β€” we need a Writable at the end. Let’s create a JSON array writer:

class ConcatJsonTransform extends Transform {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.isFirst = true;
    this.pushedArrayStart = false;
  }

  _transform(obj, encoding, callback) {
    if (!this.pushedArrayStart) {
      this.push('[\n');
      this.pushedArrayStart = true;
    }

    if (!this.isFirst) {
      this.push(',\n');
    }
    this.isFirst = false;

    this.push(JSON.stringify(obj, null, 2));
    callback();
  }

  _flush(callback) {
    this.push('\n]\n');
    callback();
  }
}

// Now the full pipeline
pipeline(
  fs.createReadStream('data.csv', { encoding: 'utf8' }),
  new CsvToJsonTransform(),
  new ConcatJsonTransform(),
  fs.createWriteStream('output.json'),
  (err) => {
    if (err) return console.error('Pipeline failed:', err.message);
    console.log('CSV β†’ JSON conversion complete');
  },
);

Example: Progress Reporting Transform

const { Transform } = require('stream');

class ProgressTransform extends Transform {
  constructor(totalSize, options = {}) {
    super(options);
    this.totalSize = totalSize;
    this.processed = 0;
    this.lastPercent = -1;
  }

  _transform(chunk, encoding, callback) {
    this.processed += chunk.length;
    const percent = Math.floor((this.processed / this.totalSize) * 100);

    if (percent !== this.lastPercent && percent % 5 === 0) {
      const bar = 'β–ˆ'.repeat(Math.floor(percent / 5)) + 'β–‘'.repeat(20 - Math.floor(percent / 5));
      console.log(`\r${bar} ${percent}%`);
      this.lastPercent = percent;
    }

    this.push(chunk); // Pass through unchanged
    callback();
  }
}

// Usage: show progress during file copy
const fs = require('fs');
const { pipeline } = require('stream');
const { promisify } = require('util');

const stat = promisify(fs.stat);

async function copyWithProgress(src, dest) {
  const stats = await stat(src);

  pipeline(
    fs.createReadStream(src),
    new ProgressTransform(stats.size),
    fs.createWriteStream(dest),
    (err) => {
      if (err) return console.error('\nCopy failed:', err.message);
      console.log('\nCopy complete!');
    },
  );
}

copyWithProgress('large-file.zip', 'large-file-copy.zip');

4. Custom Duplex Stream

Duplex streams have both a readable and a writable side. The two sides are independent.

Example: In-Memory Queue

const { Duplex } = require('stream');

class QueueStream extends Duplex {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.queue = [];
  }

  // Writable side β€” called when data is written
  _write(chunk, encoding, callback) {
    this.queue.push(chunk);
    callback();

    // Notify the readable side that data is available
    this._maybePush();
  }

  // Readable side β€” called when consumer wants data
  _read(size) {
    this._maybePush();
  }

  _maybePush() {
    if (this.queue.length > 0) {
      // Push one item at a time to the readable side
      const item = this.queue.shift();
      this.push(item);
    }
  }
}

// Usage
const queue = new QueueStream();

queue.on('data', (item) => {
  console.log('Read from queue:', item);
});

queue.write('First');
queue.write('Second');
queue.write('Third');
queue.end();

// Output:
// Read from queue: First
// Read from queue: Second
// Read from queue: Third

5. The _flush Method (Transform)

_flush is called when the source has ended. Use it to flush any remaining data:

class ChunkCollector extends Transform {
  constructor(options = {}) {
    super(options);
    this.buffer = [];
  }

  _transform(chunk, encoding, callback) {
    this.buffer.push(chunk);
    callback();
  }

  _flush(callback) {
    // Concatenate all chunks and push as one
    const combined = Buffer.concat(this.buffer);
    this.push(combined);
    callback();
  }
}

Complete Example: Line Reader Transform

const { Transform } = require('stream');

class LineSplitter extends Transform {
  constructor(options = {}) {
    super(options);
    this.buffer = '';  // Hold incomplete lines
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();

    const lines = this.buffer.split('\n');

    // The last element might be incomplete β€” keep it
    this.buffer = lines.pop();

    for (const line of lines) {
      this.push(line);
    }

    callback();
  }

  _flush(callback) {
    // Push any remaining data
    if (this.buffer.length > 0) {
      this.push(this.buffer);
    }
    callback();
  }
}

// Usage: process a large log file line by line
pipeline(
  fs.createReadStream('huge-log.log', { encoding: 'utf8', highWaterMark: 65536 }),
  new LineSplitter(),
  new Transform({
    objectMode: true,
    transform(line, encoding, callback) {
      if (line.includes('ERROR')) {
        console.log('ERROR:', line);
      }
      callback();
    },
  }),
  // Use a PassThrough or /dev/null for the final writable
  // since we're just logging
  (err) => { if (err) console.error('Error:', err); },
);

Key Takeaways

  • Implement _read() for Readable β€” push data with this.push(data), signal end with this.push(null)
  • Implement _write(chunk, enc, callback) for Writable β€” always call callback() when done
  • Implement _transform(chunk, enc, callback) for Transform β€” push transformed data, call callback
  • Implement _flush(callback) (optional) for Transform β€” called just before the stream ends
  • objectMode: true lets you push JavaScript objects instead of Buffers
  • callback() is the stream’s flow control mechanism β€” never forget to call it
  • Always use pipeline() instead of pipe() when working with custom streams
  • Custom streams are created by extending Readable, Writable, Duplex, or Transform
  • _final(callback) is the Writable equivalent of _flush β€” called before 'finish'
  • Test custom streams by piping to/from known streams (process.stdout, fs.createWriteStream)