Streams Are Everywhere in Node.js
The two most common places you encounter streams in Node.js are:
fsβ reading and writing fileshttpβ sending and receiving HTTP requests
Once you understand streams in these modules, you understand streams everywhere.
File System Streams
fs.createReadStream
The most common way to read files with streams:
const fs = require('fs');
const readStream = fs.createReadStream('large-file.txt', {
flags: 'r', // Open for reading (default)
encoding: 'utf8', // Get strings, not Buffers
fd: null, // File descriptor (null = open by path)
mode: 0o666, // File permissions (used if creating)
autoClose: true, // Close file descriptor on error/end
emitClose: true, // Emit 'close' event
start: 0, // Start reading at byte 0
end: Infinity, // Read until this byte (inclusive)
highWaterMark: 65536, // Read 64KB chunks (default)
});
Reading a Specific Byte Range
// Read bytes 100-199 of a file
const stream = fs.createReadStream('file.txt', {
start: 100,
end: 199,
encoding: 'utf8',
});
let data = '';
stream.on('data', (chunk) => { data += chunk; });
stream.on('end', () => console.log('Bytes 100-199:', data));
fs.createWriteStream
const writeStream = fs.createWriteStream('output.txt', {
flags: 'w', // 'w' = overwrite, 'a' = append
encoding: 'utf8',
fd: null,
mode: 0o666,
autoClose: true,
emitClose: true,
start: 0, // Start writing at byte offset
highWaterMark: 16384, // 16KB internal buffer
});
Streaming File Copy
The classic use case β copy a file without buffering it entirely in memory:
const fs = require('fs');
const { pipeline } = require('stream/promises');
async function copyFile(src, dest) {
await pipeline(
fs.createReadStream(src),
fs.createWriteStream(dest),
);
console.log(`Copied ${src} β ${dest}`);
}
// Copy a 10GB file using ~64KB of RAM
await copyFile('/mnt/data/backup.sql', '/backup/backup-copy.sql');
Streaming File Hash (No Buffering)
const fs = require('fs');
const crypto = require('crypto');
const { pipeline } = require('stream/promises');
async function getFileHash(filePath) {
const hash = crypto.createHash('sha256');
await pipeline(
fs.createReadStream(filePath),
hash, // Transform stream β writes hash as data flows
);
// Set the encoding on the hash AFTER the pipeline
console.log('SHA256:', hash.digest('hex'));
}
await getFileHash('/downloads/ubuntu.iso'); // Works for any file size
HTTP Streams
Streaming an HTTP Response to a File
When you download a file via HTTP, the response is a Readable stream. Pipe it directly to disk:
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream/promises');
async function download(url, destPath) {
return new Promise((resolve, reject) => {
http.get(url, async (response) => {
// Check status code
if (response.statusCode !== 200) {
reject(new Error(`HTTP ${response.statusCode}`));
return;
}
// Get file size from headers (for progress)
const totalSize = parseInt(response.headers['content-length'], 10);
console.log(`Downloading ${(totalSize / 1024 / 1024).toFixed(1)} MB`);
try {
await pipeline(
response,
fs.createWriteStream(destPath),
);
console.log('Download complete:', destPath);
resolve();
} catch (err) {
reject(err);
}
}).on('error', reject);
});
}
await download('http://example.com/large-file.zip', './download.zip');
Streaming a File as an HTTP Response (Server)
The reverse β send a file from disk over HTTP:
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream');
const mime = require('mime-types'); // npm install mime-types
const server = http.createServer((req, res) => {
if (req.url.startsWith('/files/')) {
const filePath = req.url.slice(7); // Remove '/files/'
// Check if file exists first (stat)
fs.stat(filePath, (err, stats) => {
if (err) {
res.writeHead(404);
res.end('File not found');
return;
}
// Set headers
res.writeHead(200, {
'Content-Type': mime.lookup(filePath) || 'application/octet-stream',
'Content-Length': stats.size,
'Content-Disposition': `attachment; filename="${require('path').basename(filePath)}"`,
});
// Stream the file β never buffers in RAM
pipeline(
fs.createReadStream(filePath),
res,
(pipelineErr) => {
if (pipelineErr) {
console.error('Stream error:', pipelineErr.message);
if (!res.headersSent) {
res.writeHead(500);
}
res.end();
}
},
);
});
}
});
server.listen(3000);
Streaming HTTP Request Body (File Upload)
When a client uploads a file, the request body arrives as a stream. Save it directly to disk:
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream');
const server = http.createServer((req, res) => {
if (req.method === 'POST' && req.url === '/upload') {
const filename = `upload-${Date.now()}.dat`;
pipeline(
req, // IncomingMessage is a Readable stream
fs.createWriteStream(`./uploads/${filename}`),
(err) => {
if (err) {
console.error('Upload failed:', err.message);
res.writeHead(500);
res.end('Upload failed');
return;
}
console.log('File uploaded:', filename);
res.writeHead(201);
res.end(`Uploaded: ${filename}`);
},
);
}
});
Compression with Streams
Gzip an HTTP Response on the Fly
Compress large responses before sending β dramatically reduces bandwidth:
const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream');
const server = http.createServer((req, res) => {
if (req.url === '/api/report') {
// Check if client supports gzip
const acceptsGzip = req.headers['accept-encoding']?.includes('gzip');
if (acceptsGzip) {
res.writeHead(200, {
'Content-Encoding': 'gzip',
'Content-Type': 'application/json',
});
pipeline(
fs.createReadStream('./large-report.json'),
zlib.createGzip(),
res,
(err) => {
if (err) console.error('Compression stream error:', err.message);
},
);
} else {
// Send uncompressed
pipeline(
fs.createReadStream('./large-report.json'),
res,
(err) => {
if (err) console.error('Stream error:', err.message);
},
);
}
}
});
Compress on Write, Decompress on Read
const zlib = require('zlib');
const fs = require('fs');
const { pipeline } = require('stream/promises');
// Write compressed data
async function writeCompressed(filePath, data) {
const source = require('stream').Readable.from([data]);
await pipeline(
source,
zlib.createGzip(),
fs.createWriteStream(filePath),
);
}
// Read compressed data
async function readCompressed(filePath) {
const chunks = [];
await pipeline(
fs.createReadStream(filePath),
zlib.createGunzip(),
async function* (source) {
for await (const chunk of source) {
chunks.push(chunk);
}
},
);
return Buffer.concat(chunks).toString();
}
// Usage
await writeCompressed('data.json.gz', JSON.stringify({ users: 1000 }));
const data = await readCompressed('data.json.gz');
console.log('Recovered:', data);
Encryption with Streams
const crypto = require('crypto');
const fs = require('fs');
const { pipeline } = require('stream/promises');
const algorithm = 'aes-256-gcm';
const key = crypto.randomBytes(32);
async function encryptFile(inputPath, outputPath) {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(algorithm, key, iv);
// Write IV first (needed for decryption)
await fs.promises.writeFile(outputPath + '.iv', iv);
await pipeline(
fs.createReadStream(inputPath),
cipher,
fs.createWriteStream(outputPath),
);
console.log('File encrypted');
}
async function decryptFile(inputPath, outputPath) {
const iv = await fs.promises.readFile(inputPath + '.iv');
const decipher = crypto.createDecipheriv(algorithm, key, iv);
await pipeline(
fs.createReadStream(inputPath),
decipher,
fs.createWriteStream(outputPath),
);
console.log('File decrypted');
}
Practical: Streaming HTTP Proxy
A proxy that streams data without buffering β memory usage stays low regardless of file size:
const http = require('http');
const { pipeline } = require('stream');
const server = http.createServer((clientReq, clientRes) => {
const url = new URL(clientReq.url);
const options = {
hostname: url.hostname,
port: url.port || 80,
path: url.pathname + url.search,
method: clientReq.method,
headers: clientReq.headers,
};
// Forward the request to the target server
const proxyReq = http.request(options, (proxyRes) => {
// Forward the response headers
clientRes.writeHead(proxyRes.statusCode, proxyRes.headers);
// Stream the response body β no buffering
pipeline(proxyRes, clientRes, (err) => {
if (err) console.error('Proxy response error:', err.message);
});
});
// Stream the client request body to the proxy
pipeline(clientReq, proxyReq, (err) => {
if (err) console.error('Proxy request error:', err.message);
});
});
server.listen(8080);
console.log('HTTP proxy listening on :8080');
Practical: Log Rotator with Streams
const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream/promises');
const path = require('path');
class LogRotator {
constructor(logPath, maxSize = 100 * 1024 * 1024) {
this.logPath = logPath;
this.maxSize = maxSize; // 100MB default
this.writeStream = null;
this.currentSize = 0;
}
async init() {
try {
const stats = await fs.promises.stat(this.logPath);
this.currentSize = stats.size;
} catch {
this.currentSize = 0;
}
this.writeStream = fs.createWriteStream(this.logPath, { flags: 'a' });
}
async write(line) {
const data = line + '\n';
this.writeStream.write(data);
this.currentSize += Buffer.byteLength(data);
if (this.currentSize >= this.maxSize) {
await this.rotate();
}
}
async rotate() {
// Close current file
await new Promise((resolve) => this.writeStream.end(resolve));
// Compress the old log
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const archivePath = `${this.logPath}.${timestamp}.gz`;
await pipeline(
fs.createReadStream(this.logPath),
zlib.createGzip(),
fs.createWriteStream(archivePath),
);
// Truncate the original
await fs.promises.truncate(this.logPath, 0);
this.currentSize = 0;
this.writeStream = fs.createWriteStream(this.logPath, { flags: 'a' });
console.log(`Log rotated: ${archivePath}`);
}
}
Practical: CSV to JSON Converter (Streaming)
const fs = require('fs');
const { Transform, Writable } = require('stream');
const { pipeline } = require('stream/promises');
class CsvParser extends Transform {
constructor() {
super({ objectMode: true });
this.buffer = '';
this.headers = null;
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Keep incomplete line
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',').map(v => v.trim());
if (!this.headers) {
this.headers = values;
continue;
}
const obj = {};
this.headers.forEach((h, i) => { obj[h] = values[i]; });
this.push(JSON.stringify(obj) + '\n');
}
callback();
}
_flush(callback) {
if (this.buffer.trim()) {
const values = this.buffer.split(',').map(v => v.trim());
const obj = {};
this.headers.forEach((h, i) => { obj[h] = values[i]; });
this.push(JSON.stringify(obj) + '\n');
}
callback();
}
}
// Convert a 5GB CSV to NDJSON using ~64KB of RAM
await pipeline(
fs.createReadStream('users.csv', { encoding: 'utf8' }),
new CsvParser(),
fs.createWriteStream('users.ndjson'),
);
console.log('Conversion complete');
Key Takeaways
fs.createReadStreamandfs.createWriteStreamare the primary stream APIs for file I/O- Use
start/endoptions to read specific byte ranges from a file - HTTP
req(IncomingMessage) is a Readable stream β pipe incoming uploads directly to disk - HTTP
res(ServerResponse) is a Writable stream β pipe files directly to the response Content-Lengthshould be set when streaming known-sized filesContent-Encoding: gzipenables on-the-fly compression β pair withzlib.createGzip()- Always use
pipeline()instead ofpipe()β error handling is critical in production - Streaming a file through
cryptoandzlibtransforms encrypts/compresses without buffering - A proxy with
pipeline(clientReq, proxyReq)andpipeline(proxyRes, clientRes)forwards data with zero buffering - Log rotators using streams can compress, archive, and truncate without holding the file in memory
- Any data pipeline (CSVβJSON, encryptβcompress, downloadβsave) can be built with streams using minimal memory