Why Create Custom Streams?
Built-in streams (fs, http, zlib, crypto) cover most needs, but sometimes you need:
- A custom data source (generate data dynamically, read from a database cursor)
- A custom data destination (write to a database, send to a logging service)
- A custom transformation (parse CSV, convert formats, filter data)
Creating a custom stream involves implementing a few internal methods that Node.js calls automatically.
Internal Methods You Must Implement
| Stream Type | Internal Method | Purpose |
|---|---|---|
| Readable | _read(size) | Push data into the stream |
| Writable | _write(chunk, encoding, callback) | Process incoming data |
| Writable | _final(callback) | Called before stream ends (optional) |
| Duplex | _read(size) + _write(chunk, enc, callback) | Both sides |
| Transform | _transform(chunk, encoding, callback) | Transform incoming data |
| Transform | _flush(callback) | Flush remaining data before end (optional) |
1. Custom Readable Stream
A custom Readable generates data by calling this.push(data). Push null to signal the end.
Example: Random Number Generator
const { Readable } = require('stream');
class RandomNumberStream extends Readable {
constructor(options = {}) {
super(options);
this.maxNumbers = options.maxNumbers || 10;
this.count = 0;
}
// _read is called when the consumer wants data
_read(size) {
console.log(`_read called, count: ${this.count}/${this.maxNumbers}`);
if (this.count >= this.maxNumbers) {
// Signal end of stream
this.push(null);
return;
}
const number = Math.floor(Math.random() * 100);
this.count++;
// Push a Buffer (streams work with Buffers by default)
this.push(Buffer.from(number.toString() + '\n'));
}
}
// Usage
const randomStream = new RandomNumberStream({ maxNumbers: 5 });
randomStream.on('data', (chunk) => {
console.log('Received:', chunk.toString().trim());
});
randomStream.on('end', () => {
console.log('No more random numbers');
});
// Or pipe it:
// randomStream.pipe(process.stdout);
When _read Is Called
Node.js calls _read() when:
- A
'data'listener is attached (flowing mode) stream.resume()is calledstream.pipe(dest)is called- The internal buffer is below
highWaterMark
You must call this.push(data) inside _read. If you donβt push anything, the stream will keep calling _read (up to highWaterMark bytes).
Example: Reading from a Database Cursor
const { Readable } = require('stream');
class DatabaseQueryStream extends Readable {
constructor(db, query, options = {}) {
super({ objectMode: true, ...options }); // objectMode for non-Buffer data
this.cursor = db.query(query).cursor();
}
async _read(size) {
try {
const rows = await this.cursor.fetch(size);
for (const row of rows) {
this.push(row); // Push objects, not Buffers
}
if (rows.length === 0) {
this.push(null); // No more rows
await this.cursor.close();
}
} catch (err) {
this.destroy(err);
}
}
}
// Usage
const userStream = new DatabaseQueryStream(db, 'SELECT * FROM users');
userStream.on('data', (user) => {
console.log('User:', user.name, user.email);
});
objectMode: By default, streams work with Buffers. SetobjectMode: trueto push arbitrary JavaScript objects.
2. Custom Writable Stream
A custom Writable processes data by implementing _write(chunk, encoding, callback).
Example: Line-by-Line File Writer with Timestamps
const { Writable } = require('stream');
class TimestampedFileWriter extends Writable {
constructor(filePath, options = {}) {
super(options);
this.fs = require('fs');
this.stream = this.fs.createWriteStream(filePath, { flags: 'a' });
this.lineCount = 0;
}
// encoding is 'buffer' by default, or 'utf8' if decoding is enabled
_write(chunk, encoding, callback) {
const line = chunk.toString().trim();
this.lineCount++;
// Add timestamp prefix
const timestamp = new Date().toISOString();
const output = `[${timestamp}] Line ${this.lineCount}: ${line}\n`;
// Write to file
this.stream.write(output, callback);
}
// Called after the last _write, before finish
_final(callback) {
console.log(`Total lines written: ${this.lineCount}`);
this.stream.end(callback);
}
}
// Usage
const writer = new TimestampedFileWriter('output.log');
writer.write('First entry');
writer.write('Second entry');
writer.end('Last entry');
writer.on('finish', () => {
console.log('All data written');
});
Understanding callback in _write
The callback is critical. It tells the writable stream that youβve finished processing the chunk:
_write(chunk, encoding, callback) {
doSomethingAsync(chunk, (err) => {
if (err) {
callback(err); // Propagate error β stream will emit 'error'
} else {
callback(); // Signal: ready for next chunk
}
});
}
If you never call callback(), the stream hangs forever. No 'drain', no 'finish', nothing.
3. Custom Transform Stream
Transform streams modify data as it passes through. Implement _transform(chunk, encoding, callback).
Example: CSV to JSON Converter
const { Transform } = require('stream');
class CsvToJsonTransform extends Transform {
constructor(options = {}) {
super({ objectMode: true, ...options });
this.headers = null;
}
_transform(chunk, encoding, callback) {
const line = chunk.toString().trim();
if (!line) return callback();
const values = line.split(',');
if (!this.headers) {
// First line is the header
this.headers = values.map(h => h.trim().toLowerCase());
callback();
return;
}
// Convert to object
const obj = {};
this.headers.forEach((header, i) => {
obj[header] = values[i]?.trim();
});
// Push the transformed object
this.push(obj);
callback();
}
_flush(callback) {
console.log('CSV parsing complete');
callback();
}
}
// Usage
const fs = require('fs');
const { pipeline } = require('stream');
pipeline(
fs.createReadStream('data.csv'),
new CsvToJsonTransform(),
new ConcatJsonTransform(), // Another custom transform
(err) => {
if (err) console.error('Pipeline error:', err.message);
},
);
Wait β we need a Writable at the end. Letβs create a JSON array writer:
class ConcatJsonTransform extends Transform {
constructor(options = {}) {
super({ objectMode: true, ...options });
this.isFirst = true;
this.pushedArrayStart = false;
}
_transform(obj, encoding, callback) {
if (!this.pushedArrayStart) {
this.push('[\n');
this.pushedArrayStart = true;
}
if (!this.isFirst) {
this.push(',\n');
}
this.isFirst = false;
this.push(JSON.stringify(obj, null, 2));
callback();
}
_flush(callback) {
this.push('\n]\n');
callback();
}
}
// Now the full pipeline
pipeline(
fs.createReadStream('data.csv', { encoding: 'utf8' }),
new CsvToJsonTransform(),
new ConcatJsonTransform(),
fs.createWriteStream('output.json'),
(err) => {
if (err) return console.error('Pipeline failed:', err.message);
console.log('CSV β JSON conversion complete');
},
);
Example: Progress Reporting Transform
const { Transform } = require('stream');
class ProgressTransform extends Transform {
constructor(totalSize, options = {}) {
super(options);
this.totalSize = totalSize;
this.processed = 0;
this.lastPercent = -1;
}
_transform(chunk, encoding, callback) {
this.processed += chunk.length;
const percent = Math.floor((this.processed / this.totalSize) * 100);
if (percent !== this.lastPercent && percent % 5 === 0) {
const bar = 'β'.repeat(Math.floor(percent / 5)) + 'β'.repeat(20 - Math.floor(percent / 5));
console.log(`\r${bar} ${percent}%`);
this.lastPercent = percent;
}
this.push(chunk); // Pass through unchanged
callback();
}
}
// Usage: show progress during file copy
const fs = require('fs');
const { pipeline } = require('stream');
const { promisify } = require('util');
const stat = promisify(fs.stat);
async function copyWithProgress(src, dest) {
const stats = await stat(src);
pipeline(
fs.createReadStream(src),
new ProgressTransform(stats.size),
fs.createWriteStream(dest),
(err) => {
if (err) return console.error('\nCopy failed:', err.message);
console.log('\nCopy complete!');
},
);
}
copyWithProgress('large-file.zip', 'large-file-copy.zip');
4. Custom Duplex Stream
Duplex streams have both a readable and a writable side. The two sides are independent.
Example: In-Memory Queue
const { Duplex } = require('stream');
class QueueStream extends Duplex {
constructor(options = {}) {
super({ objectMode: true, ...options });
this.queue = [];
}
// Writable side β called when data is written
_write(chunk, encoding, callback) {
this.queue.push(chunk);
callback();
// Notify the readable side that data is available
this._maybePush();
}
// Readable side β called when consumer wants data
_read(size) {
this._maybePush();
}
_maybePush() {
if (this.queue.length > 0) {
// Push one item at a time to the readable side
const item = this.queue.shift();
this.push(item);
}
}
}
// Usage
const queue = new QueueStream();
queue.on('data', (item) => {
console.log('Read from queue:', item);
});
queue.write('First');
queue.write('Second');
queue.write('Third');
queue.end();
// Output:
// Read from queue: First
// Read from queue: Second
// Read from queue: Third
5. The _flush Method (Transform)
_flush is called when the source has ended. Use it to flush any remaining data:
class ChunkCollector extends Transform {
constructor(options = {}) {
super(options);
this.buffer = [];
}
_transform(chunk, encoding, callback) {
this.buffer.push(chunk);
callback();
}
_flush(callback) {
// Concatenate all chunks and push as one
const combined = Buffer.concat(this.buffer);
this.push(combined);
callback();
}
}
Complete Example: Line Reader Transform
const { Transform } = require('stream');
class LineSplitter extends Transform {
constructor(options = {}) {
super(options);
this.buffer = ''; // Hold incomplete lines
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// The last element might be incomplete β keep it
this.buffer = lines.pop();
for (const line of lines) {
this.push(line);
}
callback();
}
_flush(callback) {
// Push any remaining data
if (this.buffer.length > 0) {
this.push(this.buffer);
}
callback();
}
}
// Usage: process a large log file line by line
pipeline(
fs.createReadStream('huge-log.log', { encoding: 'utf8', highWaterMark: 65536 }),
new LineSplitter(),
new Transform({
objectMode: true,
transform(line, encoding, callback) {
if (line.includes('ERROR')) {
console.log('ERROR:', line);
}
callback();
},
}),
// Use a PassThrough or /dev/null for the final writable
// since we're just logging
(err) => { if (err) console.error('Error:', err); },
);
Key Takeaways
- Implement
_read()for Readable β push data withthis.push(data), signal end withthis.push(null) - Implement
_write(chunk, enc, callback)for Writable β always callcallback()when done - Implement
_transform(chunk, enc, callback)for Transform β push transformed data, call callback - Implement
_flush(callback)(optional) for Transform β called just before the stream ends objectMode: truelets you push JavaScript objects instead of Bufferscallback()is the streamβs flow control mechanism β never forget to call it- Always use
pipeline()instead ofpipe()when working with custom streams - Custom streams are created by extending
Readable,Writable,Duplex, orTransform _final(callback)is the Writable equivalent of_flushβ called before'finish'- Test custom streams by piping to/from known streams (
process.stdout,fs.createWriteStream)