What pipe() Does
pipe() is the simplest way to connect a readable stream to a writable stream. It:
- Reads data from the readable stream
- Writes it to the writable stream
- Manages backpressure automatically β pauses the source when the sink is full
- Calls
end()on the writable when the readable ends
readable.pipe(writable);
// That one line replaces all of this:
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) readable.pause();
});
writable.on('drain', () => readable.resume());
readable.on('end', () => writable.end());
Basic Pipe: File Copy
const fs = require('fs');
// Copy a file using pipe β uses ~64KB of RAM regardless of file size
const source = fs.createReadStream('source.zip');
const dest = fs.createWriteStream('dest.zip');
source.pipe(dest);
dest.on('finish', () => {
console.log('File copied successfully');
});
source.on('error', (err) => console.error('Source error:', err));
dest.on('error', (err) => console.error('Dest error:', err));
Pipe Chaining
You can chain multiple transforms between a source and destination:
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
// Chain: read file β encrypt β compress β write
const source = fs.createReadStream('secret.txt');
const encrypt = crypto.createCipher('aes-256-cbc', 'my-password');
const compress = zlib.createGzip();
const dest = fs.createWriteStream('secret.txt.gz.enc');
source
.pipe(encrypt) // Transform: encrypt each chunk
.pipe(compress) // Transform: compress encrypted data
.pipe(dest); // Writable: save to disk
dest.on('finish', () => console.log('Encrypted and compressed'));
Data flow:
ββββββββββ ββββββββββββ ββββββββββ ββββββββββ
β File ββββΊβ Encrypt ββββΊβ CompressββββΊβ Disk β
β (read) β β(cipher) β β (gzip) β β (write)β
ββββββββββ ββββββββββββ ββββββββββ ββββββββββ
The Problem with pipe() β Unhandled Errors
The biggest gotcha with pipe(): errors on any stream in the chain will not propagate automatically. A single error can cause a memory leak or a hanging process:
// β BAD β unhandled errors
const source = fs.createReadStream('input.txt');
const gzip = zlib.createGzip();
const dest = fs.createWriteStream('output.txt.gz');
source.pipe(gzip).pipe(dest);
// If source emits 'error' β the stream closes
// But gzip and dest remain open! Memory leak.
// If dest's disk is full β dest errors, but source keeps reading.
The Fix: Destroy All Streams on Error
// β
GOOD β handle errors on each stream
const source = fs.createReadStream('input.txt');
const gzip = zlib.createGzip();
const dest = fs.createWriteStream('output.txt.gz');
function handleError(err) {
console.error('Stream error:', err.message);
source.destroy();
gzip.destroy();
dest.destroy();
}
source.on('error', handleError);
gzip.on('error', handleError);
dest.on('error', handleError);
source.pipe(gzip).pipe(dest);
dest.on('finish', () => console.log('Success'));
pipeline() β The Modern Alternative
Node.js 10+ introduced stream.pipeline() which handles error propagation and cleanup automatically:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// pipeline: source β ...transforms... β destination
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err.message);
} else {
console.log('Pipeline completed successfully');
}
}
);
pipeline β Promise Version
const { pipeline } = require('stream/promises');
async function compressFile(input, output) {
try {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output),
);
console.log('Compression complete');
} catch (err) {
console.error('Compression failed:', err.message);
}
}
Always use
pipelineinstead ofpipe.pipelinecorrectly destroys all streams on error, preventing memory leaks and hanging processes.pipedoes not.
Why pipeline Is Safer
pipe() error flow: pipeline() error flow:
source βββΊ gzip βββΊ dest source βββΊ gzip βββΊ dest
β β β β
βββ error ββββββββββββ βββ error ββββββββββββ
β gzip still open β ALL destroyed
β dest still open β callback called
β memory leak
Piping HTTP Requests (Real-World Example)
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream');
const zlib = require('zlib');
const server = http.createServer((req, res) => {
if (req.url === '/download') {
// Stream a compressed file to the client
res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip',
});
pipeline(
fs.createReadStream('./large-data.json'),
zlib.createGzip(),
res,
(err) => {
if (err) {
console.error('Stream failed:', err.message);
if (!res.headersSent) {
res.writeHead(500);
}
res.end();
}
},
);
}
});
Piping as the Client
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream');
// Download a file and save it to disk
http.get('http://example.com/large-file.zip', (response) => {
pipeline(
response, // IncomingMessage (Readable)
fs.createWriteStream('./downloaded.zip'),
(err) => {
if (err) {
console.error('Download failed:', err.message);
} else {
console.log('Download complete');
}
},
);
});
The unpipe() Method
Remove a pipe connection:
const source = fs.createReadStream('data.txt');
const dest1 = fs.createWriteStream('copy1.txt');
const dest2 = fs.createWriteStream('copy2.txt');
// Pipe to both (remember to manage backpressure manually for multiple destinations)
source.pipe(dest1);
// Stop piping to dest1
source.unpipe(dest1);
// Pipe to dest2 instead
source.pipe(dest2);
Duplex Piping (Sockets)
Duplex streams (like TCP sockets) can be piped in both directions:
const net = require('net');
const { pipeline } = require('stream');
// Echo server with bidirectional piping
net.createServer((socket) => {
pipeline(
socket, // Readable side of socket
socket, // Writable side of socket (echo back)
(err) => {
if (err) console.error('Socket error:', err.message);
},
);
}).listen(3000);
// Client side
const client = net.connect(3000);
client.write('Hello');
client.on('data', (data) => console.log('Echo:', data.toString()));
Practical: Log Archiver
const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream/promises');
const path = require('path');
class LogArchiver {
constructor(logDir, archiveDir) {
this.logDir = logDir;
this.archiveDir = archiveDir;
}
async archiveLogFile(filename) {
const source = path.join(this.logDir, filename);
const target = path.join(this.archiveDir, `${filename}.gz`);
console.log(`Archiving ${filename}...`);
try {
await pipeline(
fs.createReadStream(source),
zlib.createGzip({ level: 9 }),
fs.createWriteStream(target),
);
// Remove original after successful archive
await fs.promises.unlink(source);
console.log(`Archived: ${filename} (${target})`);
} catch (err) {
console.error(`Failed to archive ${filename}:`, err.message);
}
}
async archiveAll() {
const files = await fs.promises.readdir(this.logDir);
const logFiles = files.filter(f => f.endsWith('.log'));
for (const file of logFiles) {
await this.archiveLogFile(file);
}
console.log(`Archived ${logFiles.length} log files`);
}
}
When NOT to Use pipe()
| Scenario | Why Not pipe? | Alternative |
|---|---|---|
| Need to modify data per chunk | pipe passes through unchanged | Use Transform stream in the pipe chain |
| Need to control flow precisely | pipe manages backpressure automatically | Manual 'data' / 'drain' handling |
| Single writable, many sources | pipe expects one-to-one | Multiple write() calls |
| Need to inspect each chunk | pipe is fire-and-forget | Transform with logging |
Key Takeaways
pipe()connects readable β writable with automatic backpressure management- Pipe chaining lets you compose: source β transform β transform β dest
pipe()does NOT propagate errors β an error on one stream can leak memory- Always use
pipeline()instead ofpipe()β it handles errors and cleanup correctly pipeline()has a callback (Node 10+) and a promise version (Node 15+ viastream/promises)unpipe()removes a pipe connection from a stream- For HTTP, pipe the response into a file write stream to download, or pipe a file read stream into the response to upload
- For multiple destinations, you must manage backpressure manually or use
teepatterns - When something goes wrong in a pipe chain, destroy all streams or use
pipeline()