streams
Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install pluginagentmarketplace-custom-plugin-nodejs-streams
Repository
Skill path: skills/streams
Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Backend, Data / AI.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: pluginagentmarketplace.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install streams into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/pluginagentmarketplace/custom-plugin-nodejs before adding streams to shared team environments
- Use streams for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: streams
description: Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines
version: "2.1.0"
sasmp_version: "1.3.0"
bonded_agent: 03-async-programming
bond_type: PRIMARY_BOND
---
# Node.js Streams Skill
Master streams for memory-efficient processing of large files, real-time data, and building composable data pipelines.
## Quick Start
Streams in 4 types:
1. **Readable** - Source of data (file, HTTP request)
2. **Writable** - Destination (file, HTTP response)
3. **Transform** - Modify data in transit
4. **Duplex** - Both readable and writable
## Core Concepts
### Readable Stream
```javascript
const fs = require('fs');
// Create readable stream
const readStream = fs.createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Event-based consumption
readStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
readStream.on('end', () => {
console.log('Finished reading');
});
readStream.on('error', (err) => {
console.error('Read error:', err);
});
```
### Writable Stream
```javascript
const writeStream = fs.createWriteStream('output.txt');
// Write data
writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end(); // Signal end
// Handle backpressure
const ok = writeStream.write(data);
if (!ok) {
// Wait for drain event before writing more
writeStream.once('drain', () => {
continueWriting();
});
}
```
### Transform Stream
```javascript
const { Transform } = require('stream');
// Custom transform: uppercase text
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Usage
fs.createReadStream('input.txt')
.pipe(upperCase)
.pipe(fs.createWriteStream('output.txt'));
```
## Learning Path
### Beginner (1-2 weeks)
- ✅ Understand stream types
- ✅ Read/write file streams
- ✅ Basic pipe operations
- ✅ Handle stream events
### Intermediate (3-4 weeks)
- ✅ Transform streams
- ✅ Backpressure handling
- ✅ Object mode streams
- ✅ Pipeline utility
### Advanced (5-6 weeks)
- ✅ Custom stream implementation
- ✅ Async iterators
- ✅ Web Streams API
- ✅ Performance optimization
## Pipeline (Recommended)
```javascript
const { pipeline } = require('stream/promises');
const zlib = require('zlib');
// Compose streams with error handling
async function compressFile(input, output) {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Compression complete');
}
// With transform
await pipeline(
fs.createReadStream('data.csv'),
csvParser(),
transformRow(),
jsonStringify(),
fs.createWriteStream('data.json')
);
```
### Pipeline with Error Handling
```javascript
const { pipeline } = require('stream');
pipeline(
source,
transform1,
transform2,
destination,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
```
## HTTP Streaming
```javascript
const http = require('http');
const fs = require('fs');
// Stream file as HTTP response
http.createServer((req, res) => {
const filePath = './video.mp4';
const stat = fs.statSync(filePath);
res.writeHead(200, {
'Content-Type': 'video/mp4',
'Content-Length': stat.size
});
// Stream instead of loading entire file
fs.createReadStream(filePath).pipe(res);
}).listen(3000);
// Stream HTTP request body
http.createServer((req, res) => {
const writeStream = fs.createWriteStream('./upload.bin');
req.pipe(writeStream);
req.on('end', () => {
res.end('Upload complete');
});
}).listen(3001);
```
## Object Mode Streams
```javascript
const { Transform } = require('stream');
const jsonParser = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk);
this.push(obj);
callback();
} catch (err) {
callback(err);
}
}
});
// Process objects instead of buffers
const processRecords = new Transform({
objectMode: true,
transform(record, encoding, callback) {
record.processed = true;
record.timestamp = Date.now();
this.push(record);
callback();
}
});
```
## Async Iterators
```javascript
const { Readable } = require('stream');
// Create from async iterator
async function* generateData() {
for (let i = 0; i < 100; i++) {
yield { id: i, data: `item-${i}` };
}
}
const stream = Readable.from(generateData(), { objectMode: true });
// Consume with for-await
async function processStream(readable) {
for await (const chunk of readable) {
console.log('Processing:', chunk);
}
}
```
## Backpressure Handling
```javascript
const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
// Check if writable can accept more data
const canContinue = writable.write(chunk);
if (!canContinue) {
// Pause reading until writable is ready
readable.pause();
writable.once('drain', () => {
readable.resume();
});
}
});
// Or use pipeline (handles automatically)
pipeline(readable, writable, (err) => {
if (err) console.error('Error:', err);
});
```
## Custom Readable Stream
```javascript
const { Readable } = require('stream');
class DatabaseStream extends Readable {
constructor(query, options) {
super({ ...options, objectMode: true });
this.query = query;
this.cursor = null;
}
async _read() {
if (!this.cursor) {
this.cursor = await db.collection('items').find(this.query).cursor();
}
const doc = await this.cursor.next();
if (doc) {
this.push(doc);
} else {
this.push(null); // Signal end
}
}
}
// Usage
const dbStream = new DatabaseStream({ status: 'active' });
for await (const item of dbStream) {
console.log(item);
}
```
## Unit Test Template
```javascript
const { Readable, Transform } = require('stream');
const { pipeline } = require('stream/promises');
describe('Stream Processing', () => {
it('should transform data correctly', async () => {
const input = Readable.from(['hello', 'world']);
const chunks = [];
const upperCase = new Transform({
transform(chunk, enc, cb) {
this.push(chunk.toString().toUpperCase());
cb();
}
});
await pipeline(
input,
upperCase,
async function* (source) {
for await (const chunk of source) {
chunks.push(chunk.toString());
}
}
);
expect(chunks).toEqual(['HELLO', 'WORLD']);
});
});
```
## Troubleshooting
| Problem | Cause | Solution |
|---------|-------|----------|
| Memory grows infinitely | No backpressure | Use pipeline or handle drain |
| Data loss | Errors not caught | Use pipeline with error callback |
| Slow processing | Small chunk size | Increase highWaterMark |
| Stream hangs | Missing end() call | Call writable.end() |
## When to Use
Use streams when:
- Processing large files (GB+)
- Real-time data processing
- Memory-constrained environments
- Building data pipelines
- HTTP request/response handling
## Related Skills
- Async Programming (async patterns)
- Performance Optimization (memory efficiency)
- Express REST API (streaming responses)
## Resources
- [Node.js Streams Docs](https://nodejs.org/api/stream.html)
- [Stream Handbook](https://github.com/substack/stream-handbook)
- [Node.js Streams Guide](https://nodejs.dev/learn/nodejs-streams)