Using pipe() β€” Connecting Streams Β· Astro Tech Blog

What pipe() Does

pipe() is the simplest way to connect a readable stream to a writable stream. It:

  1. Reads data from the readable stream
  2. Writes it to the writable stream
  3. Manages backpressure automatically β€” pauses the source when the sink is full
  4. Calls end() on the writable when the readable ends
readable.pipe(writable);
// That one line replaces all of this:
readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);
  if (!canContinue) readable.pause();
});
writable.on('drain', () => readable.resume());
readable.on('end', () => writable.end());

Basic Pipe: File Copy

const fs = require('fs');

// Copy a file using pipe β€” uses ~64KB of RAM regardless of file size
const source = fs.createReadStream('source.zip');
const dest = fs.createWriteStream('dest.zip');

source.pipe(dest);

dest.on('finish', () => {
  console.log('File copied successfully');
});

source.on('error', (err) => console.error('Source error:', err));
dest.on('error', (err) => console.error('Dest error:', err));

Pipe Chaining

You can chain multiple transforms between a source and destination:

const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

// Chain: read file β†’ encrypt β†’ compress β†’ write
const source = fs.createReadStream('secret.txt');
const encrypt = crypto.createCipher('aes-256-cbc', 'my-password');
const compress = zlib.createGzip();
const dest = fs.createWriteStream('secret.txt.gz.enc');

source
  .pipe(encrypt)    // Transform: encrypt each chunk
  .pipe(compress)   // Transform: compress encrypted data
  .pipe(dest);      // Writable: save to disk

dest.on('finish', () => console.log('Encrypted and compressed'));
Data flow:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  File  │──►│ Encrypt  │──►│ Compress│──►│  Disk  β”‚
β”‚ (read) β”‚   β”‚(cipher)  β”‚   β”‚ (gzip)  β”‚   β”‚ (write)β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜

The Problem with pipe() β€” Unhandled Errors

The biggest gotcha with pipe(): errors on any stream in the chain will not propagate automatically. A single error can cause a memory leak or a hanging process:

// ❌ BAD β€” unhandled errors
const source = fs.createReadStream('input.txt');
const gzip = zlib.createGzip();
const dest = fs.createWriteStream('output.txt.gz');

source.pipe(gzip).pipe(dest);

// If source emits 'error' β†’ the stream closes
// But gzip and dest remain open! Memory leak.
// If dest's disk is full β†’ dest errors, but source keeps reading.

The Fix: Destroy All Streams on Error

// βœ… GOOD β€” handle errors on each stream
const source = fs.createReadStream('input.txt');
const gzip = zlib.createGzip();
const dest = fs.createWriteStream('output.txt.gz');

function handleError(err) {
  console.error('Stream error:', err.message);
  source.destroy();
  gzip.destroy();
  dest.destroy();
}

source.on('error', handleError);
gzip.on('error', handleError);
dest.on('error', handleError);

source.pipe(gzip).pipe(dest);

dest.on('finish', () => console.log('Success'));

pipeline() β€” The Modern Alternative

Node.js 10+ introduced stream.pipeline() which handles error propagation and cleanup automatically:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// pipeline: source β†’ ...transforms... β†’ destination
pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('output.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err.message);
    } else {
      console.log('Pipeline completed successfully');
    }
  }
);

pipeline β€” Promise Version

const { pipeline } = require('stream/promises');

async function compressFile(input, output) {
  try {
    await pipeline(
      fs.createReadStream(input),
      zlib.createGzip(),
      fs.createWriteStream(output),
    );
    console.log('Compression complete');
  } catch (err) {
    console.error('Compression failed:', err.message);
  }
}

Always use pipeline instead of pipe. pipeline correctly destroys all streams on error, preventing memory leaks and hanging processes. pipe does not.

Why pipeline Is Safer

pipe() error flow:              pipeline() error flow:
source ──► gzip ──► dest        source ──► gzip ──► dest
  β”‚                    β”‚           β”‚                    β”‚
  └── error β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           └── error β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       ↑ gzip still open                ↑ ALL destroyed
       ↑ dest still open                ↑ callback called
       ↑ memory leak

Piping HTTP Requests (Real-World Example)

const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
  if (req.url === '/download') {
    // Stream a compressed file to the client
    res.writeHead(200, {
      'Content-Type': 'application/octet-stream',
      'Content-Encoding': 'gzip',
    });

    pipeline(
      fs.createReadStream('./large-data.json'),
      zlib.createGzip(),
      res,
      (err) => {
        if (err) {
          console.error('Stream failed:', err.message);
          if (!res.headersSent) {
            res.writeHead(500);
          }
          res.end();
        }
      },
    );
  }
});

Piping as the Client

const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream');

// Download a file and save it to disk
http.get('http://example.com/large-file.zip', (response) => {
  pipeline(
    response,  // IncomingMessage (Readable)
    fs.createWriteStream('./downloaded.zip'),
    (err) => {
      if (err) {
        console.error('Download failed:', err.message);
      } else {
        console.log('Download complete');
      }
    },
  );
});

The unpipe() Method

Remove a pipe connection:

const source = fs.createReadStream('data.txt');
const dest1 = fs.createWriteStream('copy1.txt');
const dest2 = fs.createWriteStream('copy2.txt');

// Pipe to both (remember to manage backpressure manually for multiple destinations)
source.pipe(dest1);

// Stop piping to dest1
source.unpipe(dest1);

// Pipe to dest2 instead
source.pipe(dest2);

Duplex Piping (Sockets)

Duplex streams (like TCP sockets) can be piped in both directions:

const net = require('net');
const { pipeline } = require('stream');

// Echo server with bidirectional piping
net.createServer((socket) => {
  pipeline(
    socket,  // Readable side of socket
    socket,  // Writable side of socket (echo back)
    (err) => {
      if (err) console.error('Socket error:', err.message);
    },
  );
}).listen(3000);

// Client side
const client = net.connect(3000);
client.write('Hello');
client.on('data', (data) => console.log('Echo:', data.toString()));

Practical: Log Archiver

const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream/promises');
const path = require('path');

class LogArchiver {
  constructor(logDir, archiveDir) {
    this.logDir = logDir;
    this.archiveDir = archiveDir;
  }

  async archiveLogFile(filename) {
    const source = path.join(this.logDir, filename);
    const target = path.join(this.archiveDir, `${filename}.gz`);

    console.log(`Archiving ${filename}...`);

    try {
      await pipeline(
        fs.createReadStream(source),
        zlib.createGzip({ level: 9 }),
        fs.createWriteStream(target),
      );

      // Remove original after successful archive
      await fs.promises.unlink(source);
      console.log(`Archived: ${filename} (${target})`);
    } catch (err) {
      console.error(`Failed to archive ${filename}:`, err.message);
    }
  }

  async archiveAll() {
    const files = await fs.promises.readdir(this.logDir);
    const logFiles = files.filter(f => f.endsWith('.log'));

    for (const file of logFiles) {
      await this.archiveLogFile(file);
    }

    console.log(`Archived ${logFiles.length} log files`);
  }
}

When NOT to Use pipe()

ScenarioWhy Not pipe?Alternative
Need to modify data per chunkpipe passes through unchangedUse Transform stream in the pipe chain
Need to control flow preciselypipe manages backpressure automaticallyManual 'data' / 'drain' handling
Single writable, many sourcespipe expects one-to-oneMultiple write() calls
Need to inspect each chunkpipe is fire-and-forgetTransform with logging

Key Takeaways

  • pipe() connects readable β†’ writable with automatic backpressure management
  • Pipe chaining lets you compose: source β†’ transform β†’ transform β†’ dest
  • pipe() does NOT propagate errors β€” an error on one stream can leak memory
  • Always use pipeline() instead of pipe() β€” it handles errors and cleanup correctly
  • pipeline() has a callback (Node 10+) and a promise version (Node 15+ via stream/promises)
  • unpipe() removes a pipe connection from a stream
  • For HTTP, pipe the response into a file write stream to download, or pipe a file read stream into the response to upload
  • For multiple destinations, you must manage backpressure manually or use tee patterns
  • When something goes wrong in a pipe chain, destroy all streams or use pipeline()