Stream-Powered APIs β€” fs & http Β· Astro Tech Blog

Streams Are Everywhere in Node.js

The two most common places you encounter streams in Node.js are:

  • fs β€” reading and writing files
  • http β€” sending and receiving HTTP requests

Once you understand streams in these modules, you understand streams everywhere.

File System Streams

fs.createReadStream

The most common way to read files with streams:

const fs = require('fs');

const readStream = fs.createReadStream('large-file.txt', {
  flags: 'r',            // Open for reading (default)
  encoding: 'utf8',       // Get strings, not Buffers
  fd: null,               // File descriptor (null = open by path)
  mode: 0o666,            // File permissions (used if creating)
  autoClose: true,        // Close file descriptor on error/end
  emitClose: true,        // Emit 'close' event
  start: 0,              // Start reading at byte 0
  end: Infinity,          // Read until this byte (inclusive)
  highWaterMark: 65536,   // Read 64KB chunks (default)
});

Reading a Specific Byte Range

// Read bytes 100-199 of a file
const stream = fs.createReadStream('file.txt', {
  start: 100,
  end: 199,
  encoding: 'utf8',
});

let data = '';
stream.on('data', (chunk) => { data += chunk; });
stream.on('end', () => console.log('Bytes 100-199:', data));

fs.createWriteStream

const writeStream = fs.createWriteStream('output.txt', {
  flags: 'w',            // 'w' = overwrite, 'a' = append
  encoding: 'utf8',
  fd: null,
  mode: 0o666,
  autoClose: true,
  emitClose: true,
  start: 0,              // Start writing at byte offset
  highWaterMark: 16384,  // 16KB internal buffer
});

Streaming File Copy

The classic use case β€” copy a file without buffering it entirely in memory:

const fs = require('fs');
const { pipeline } = require('stream/promises');

async function copyFile(src, dest) {
  await pipeline(
    fs.createReadStream(src),
    fs.createWriteStream(dest),
  );
  console.log(`Copied ${src} β†’ ${dest}`);
}

// Copy a 10GB file using ~64KB of RAM
await copyFile('/mnt/data/backup.sql', '/backup/backup-copy.sql');

Streaming File Hash (No Buffering)

const fs = require('fs');
const crypto = require('crypto');
const { pipeline } = require('stream/promises');

async function getFileHash(filePath) {
  const hash = crypto.createHash('sha256');

  await pipeline(
    fs.createReadStream(filePath),
    hash,        // Transform stream β€” writes hash as data flows
  );

  // Set the encoding on the hash AFTER the pipeline
  console.log('SHA256:', hash.digest('hex'));
}

await getFileHash('/downloads/ubuntu.iso'); // Works for any file size

HTTP Streams

Streaming an HTTP Response to a File

When you download a file via HTTP, the response is a Readable stream. Pipe it directly to disk:

const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream/promises');

async function download(url, destPath) {
  return new Promise((resolve, reject) => {
    http.get(url, async (response) => {
      // Check status code
      if (response.statusCode !== 200) {
        reject(new Error(`HTTP ${response.statusCode}`));
        return;
      }

      // Get file size from headers (for progress)
      const totalSize = parseInt(response.headers['content-length'], 10);
      console.log(`Downloading ${(totalSize / 1024 / 1024).toFixed(1)} MB`);

      try {
        await pipeline(
          response,
          fs.createWriteStream(destPath),
        );
        console.log('Download complete:', destPath);
        resolve();
      } catch (err) {
        reject(err);
      }
    }).on('error', reject);
  });
}

await download('http://example.com/large-file.zip', './download.zip');

Streaming a File as an HTTP Response (Server)

The reverse β€” send a file from disk over HTTP:

const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream');
const mime = require('mime-types'); // npm install mime-types

const server = http.createServer((req, res) => {
  if (req.url.startsWith('/files/')) {
    const filePath = req.url.slice(7); // Remove '/files/'

    // Check if file exists first (stat)
    fs.stat(filePath, (err, stats) => {
      if (err) {
        res.writeHead(404);
        res.end('File not found');
        return;
      }

      // Set headers
      res.writeHead(200, {
        'Content-Type': mime.lookup(filePath) || 'application/octet-stream',
        'Content-Length': stats.size,
        'Content-Disposition': `attachment; filename="${require('path').basename(filePath)}"`,
      });

      // Stream the file β€” never buffers in RAM
      pipeline(
        fs.createReadStream(filePath),
        res,
        (pipelineErr) => {
          if (pipelineErr) {
            console.error('Stream error:', pipelineErr.message);
            if (!res.headersSent) {
              res.writeHead(500);
            }
            res.end();
          }
        },
      );
    });
  }
});

server.listen(3000);

Streaming HTTP Request Body (File Upload)

When a client uploads a file, the request body arrives as a stream. Save it directly to disk:

const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream');

const server = http.createServer((req, res) => {
  if (req.method === 'POST' && req.url === '/upload') {
    const filename = `upload-${Date.now()}.dat`;

    pipeline(
      req,  // IncomingMessage is a Readable stream
      fs.createWriteStream(`./uploads/${filename}`),
      (err) => {
        if (err) {
          console.error('Upload failed:', err.message);
          res.writeHead(500);
          res.end('Upload failed');
          return;
        }

        console.log('File uploaded:', filename);
        res.writeHead(201);
        res.end(`Uploaded: ${filename}`);
      },
    );
  }
});

Compression with Streams

Gzip an HTTP Response on the Fly

Compress large responses before sending β€” dramatically reduces bandwidth:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream');

const server = http.createServer((req, res) => {
  if (req.url === '/api/report') {
    // Check if client supports gzip
    const acceptsGzip = req.headers['accept-encoding']?.includes('gzip');

    if (acceptsGzip) {
      res.writeHead(200, {
        'Content-Encoding': 'gzip',
        'Content-Type': 'application/json',
      });

      pipeline(
        fs.createReadStream('./large-report.json'),
        zlib.createGzip(),
        res,
        (err) => {
          if (err) console.error('Compression stream error:', err.message);
        },
      );
    } else {
      // Send uncompressed
      pipeline(
        fs.createReadStream('./large-report.json'),
        res,
        (err) => {
          if (err) console.error('Stream error:', err.message);
        },
      );
    }
  }
});

Compress on Write, Decompress on Read

const zlib = require('zlib');
const fs = require('fs');
const { pipeline } = require('stream/promises');

// Write compressed data
async function writeCompressed(filePath, data) {
  const source = require('stream').Readable.from([data]);

  await pipeline(
    source,
    zlib.createGzip(),
    fs.createWriteStream(filePath),
  );
}

// Read compressed data
async function readCompressed(filePath) {
  const chunks = [];

  await pipeline(
    fs.createReadStream(filePath),
    zlib.createGunzip(),
    async function* (source) {
      for await (const chunk of source) {
        chunks.push(chunk);
      }
    },
  );

  return Buffer.concat(chunks).toString();
}

// Usage
await writeCompressed('data.json.gz', JSON.stringify({ users: 1000 }));
const data = await readCompressed('data.json.gz');
console.log('Recovered:', data);

Encryption with Streams

const crypto = require('crypto');
const fs = require('fs');
const { pipeline } = require('stream/promises');

const algorithm = 'aes-256-gcm';
const key = crypto.randomBytes(32);

async function encryptFile(inputPath, outputPath) {
  const iv = crypto.randomBytes(16);

  const cipher = crypto.createCipheriv(algorithm, key, iv);

  // Write IV first (needed for decryption)
  await fs.promises.writeFile(outputPath + '.iv', iv);

  await pipeline(
    fs.createReadStream(inputPath),
    cipher,
    fs.createWriteStream(outputPath),
  );

  console.log('File encrypted');
}

async function decryptFile(inputPath, outputPath) {
  const iv = await fs.promises.readFile(inputPath + '.iv');

  const decipher = crypto.createDecipheriv(algorithm, key, iv);

  await pipeline(
    fs.createReadStream(inputPath),
    decipher,
    fs.createWriteStream(outputPath),
  );

  console.log('File decrypted');
}

Practical: Streaming HTTP Proxy

A proxy that streams data without buffering β€” memory usage stays low regardless of file size:

const http = require('http');
const { pipeline } = require('stream');

const server = http.createServer((clientReq, clientRes) => {
  const url = new URL(clientReq.url);
  const options = {
    hostname: url.hostname,
    port: url.port || 80,
    path: url.pathname + url.search,
    method: clientReq.method,
    headers: clientReq.headers,
  };

  // Forward the request to the target server
  const proxyReq = http.request(options, (proxyRes) => {
    // Forward the response headers
    clientRes.writeHead(proxyRes.statusCode, proxyRes.headers);

    // Stream the response body β€” no buffering
    pipeline(proxyRes, clientRes, (err) => {
      if (err) console.error('Proxy response error:', err.message);
    });
  });

  // Stream the client request body to the proxy
  pipeline(clientReq, proxyReq, (err) => {
    if (err) console.error('Proxy request error:', err.message);
  });
});

server.listen(8080);
console.log('HTTP proxy listening on :8080');

Practical: Log Rotator with Streams

const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream/promises');
const path = require('path');

class LogRotator {
  constructor(logPath, maxSize = 100 * 1024 * 1024) {
    this.logPath = logPath;
    this.maxSize = maxSize; // 100MB default
    this.writeStream = null;
    this.currentSize = 0;
  }

  async init() {
    try {
      const stats = await fs.promises.stat(this.logPath);
      this.currentSize = stats.size;
    } catch {
      this.currentSize = 0;
    }

    this.writeStream = fs.createWriteStream(this.logPath, { flags: 'a' });
  }

  async write(line) {
    const data = line + '\n';
    this.writeStream.write(data);
    this.currentSize += Buffer.byteLength(data);

    if (this.currentSize >= this.maxSize) {
      await this.rotate();
    }
  }

  async rotate() {
    // Close current file
    await new Promise((resolve) => this.writeStream.end(resolve));

    // Compress the old log
    const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
    const archivePath = `${this.logPath}.${timestamp}.gz`;

    await pipeline(
      fs.createReadStream(this.logPath),
      zlib.createGzip(),
      fs.createWriteStream(archivePath),
    );

    // Truncate the original
    await fs.promises.truncate(this.logPath, 0);
    this.currentSize = 0;
    this.writeStream = fs.createWriteStream(this.logPath, { flags: 'a' });

    console.log(`Log rotated: ${archivePath}`);
  }
}

Practical: CSV to JSON Converter (Streaming)

const fs = require('fs');
const { Transform, Writable } = require('stream');
const { pipeline } = require('stream/promises');

class CsvParser extends Transform {
  constructor() {
    super({ objectMode: true });
    this.buffer = '';
    this.headers = null;
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep incomplete line

    for (const line of lines) {
      if (!line.trim()) continue;

      const values = line.split(',').map(v => v.trim());

      if (!this.headers) {
        this.headers = values;
        continue;
      }

      const obj = {};
      this.headers.forEach((h, i) => { obj[h] = values[i]; });
      this.push(JSON.stringify(obj) + '\n');
    }

    callback();
  }

  _flush(callback) {
    if (this.buffer.trim()) {
      const values = this.buffer.split(',').map(v => v.trim());
      const obj = {};
      this.headers.forEach((h, i) => { obj[h] = values[i]; });
      this.push(JSON.stringify(obj) + '\n');
    }
    callback();
  }
}

// Convert a 5GB CSV to NDJSON using ~64KB of RAM
await pipeline(
  fs.createReadStream('users.csv', { encoding: 'utf8' }),
  new CsvParser(),
  fs.createWriteStream('users.ndjson'),
);

console.log('Conversion complete');

Key Takeaways

  • fs.createReadStream and fs.createWriteStream are the primary stream APIs for file I/O
  • Use start/end options to read specific byte ranges from a file
  • HTTP req (IncomingMessage) is a Readable stream β€” pipe incoming uploads directly to disk
  • HTTP res (ServerResponse) is a Writable stream β€” pipe files directly to the response
  • Content-Length should be set when streaming known-sized files
  • Content-Encoding: gzip enables on-the-fly compression β€” pair with zlib.createGzip()
  • Always use pipeline() instead of pipe() β€” error handling is critical in production
  • Streaming a file through crypto and zlib transforms encrypts/compresses without buffering
  • A proxy with pipeline(clientReq, proxyReq) and pipeline(proxyRes, clientRes) forwards data with zero buffering
  • Log rotators using streams can compress, archive, and truncate without holding the file in memory
  • Any data pipeline (CSVβ†’JSON, encryptβ†’compress, downloadβ†’save) can be built with streams using minimal memory