@push.rocks/smartstream
A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.
readme.md for @push.rocks/smartstream
A TypeScript-first library for creating and manipulating Node.js and Web streams with built-in backpressure handling, async transformations, and seamless Node.js ↔ Web stream interoperability.
Issue Reporting and Security
For reporting bugs, issues, or security vulnerabilities, please visit community.foss.global/. This is the central community hub for all issue reporting. Developers who sign and comply with our contribution agreement and go through identification can also get a code.foss.global/ account to submit Pull Requests directly.
Install
pnpm install @push.rocks/smartstream
The package ships with two entry points:
| Entry Point | Import Path | Environment |
|---|---|---|
| Node.js (default) | @push.rocks/smartstream |
Node.js — full stream utilities, duplex, intake, wrappers, and Node↔Web helpers |
| Web | @push.rocks/smartstream/web |
Browser & Node.js — pure Web Streams API (WebDuplexStream) |
Usage
All examples use ESM / TypeScript syntax.
📦 Importing
// Node.js — full API
import {
SmartDuplex,
StreamWrapper,
StreamIntake,
createTransformFunction,
createPassThrough,
nodewebhelpers,
} from '@push.rocks/smartstream';
// Web — browser-safe, zero Node.js dependencies
import { WebDuplexStream } from '@push.rocks/smartstream/web';
🔄 SmartDuplex — The Core Stream Primitive
SmartDuplex extends Node.js Duplex with first-class async support, built-in backpressure management, and a clean functional API. Instead of overriding _transform or _write manually, you pass a writeFunction that receives each chunk along with a tools object.
Basic Transform
import { SmartDuplex } from '@push.rocks/smartstream';
const upperCaser = new SmartDuplex<Buffer, Buffer>({
writeFunction: async (chunk, tools) => {
// Return a value to push it downstream
return Buffer.from(chunk.toString().toUpperCase());
},
});
readableStream.pipe(upperCaser).pipe(writableStream);
Using tools.push() for Multiple Outputs
The writeFunction can emit multiple chunks per input via tools.push():
const splitter = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async (chunk, tools) => {
const words = chunk.split(' ');
for (const word of words) {
await tools.push(word);
}
// Returning nothing — output was already pushed
},
});
Final Function
Run cleanup or emit final data when the writable side ends:
const aggregator = new SmartDuplex<number, number>({
objectMode: true,
writeFunction: async (chunk, tools) => {
runningTotal += chunk;
// Don't emit anything per-chunk
},
finalFunction: async (tools) => {
return runningTotal; // Emitted as the last chunk
},
});
Truncating a Stream Early
Call tools.truncate() inside writeFunction to signal that no more data should be read:
const limiter = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async (chunk, tools) => {
if (chunk === 'STOP') {
tools.truncate();
return;
}
return chunk;
},
});
Creating from a Buffer
const stream = SmartDuplex.fromBuffer(Buffer.from('hello world'));
stream.on('data', (chunk) => console.log(chunk.toString())); // "hello world"
Creating from a Web ReadableStream
Bridge the Web Streams API into a Node.js Duplex:
const response = await fetch('https://example.com/data');
const nodeDuplex = SmartDuplex.fromWebReadableStream(response.body);
nodeDuplex.pipe(processTransform).pipe(outputStream);
Getting Web Streams from SmartDuplex
Convert a SmartDuplex into Web ReadableStream + WritableStream pair:
const duplex = new SmartDuplex({
writeFunction: async (chunk, tools) => {
return transform(chunk);
},
});
const { readable, writable } = await duplex.getWebStreams();
Debug Mode
Pass debug: true and name to get detailed internal logs:
const stream = new SmartDuplex({
name: 'MyStream',
debug: true,
writeFunction: async (chunk, tools) => chunk,
});
🧩 StreamWrapper — Pipeline Composition
StreamWrapper takes an array of streams, pipes them together, attaches error listeners on all of them, and returns a Promise that resolves when the pipeline finishes:
import { StreamWrapper } from '@push.rocks/smartstream';
import fs from 'fs';
const pipeline = new StreamWrapper([
fs.createReadStream('./input.txt'),
new SmartDuplex({
writeFunction: async (chunk) => Buffer.from(chunk.toString().toUpperCase()),
}),
fs.createWriteStream('./output.txt'),
]);
await pipeline.run();
console.log('Pipeline complete!');
Error handling is automatic — if any stream in the array errors, the returned promise rejects:
pipeline.run()
.then(() => console.log('Done'))
.catch((err) => console.error('Pipeline failed:', err));
You can also listen for custom events across all streams:
pipeline.onCustomEvent('progress', () => {
console.log('Progress event fired');
});
📥 StreamIntake — Dynamic Data Injection
StreamIntake is a Readable stream that lets you programmatically push data into a pipeline. It operates in object mode by default and provides a reactive observable (pushNextObservable) for demand-driven data production.
import { StreamIntake, SmartDuplex } from '@push.rocks/smartstream';
const intake = new StreamIntake<string>();
// Pipe through a transform
intake
.pipe(new SmartDuplex({
objectMode: true,
writeFunction: async (chunk) => {
console.log('Processing:', chunk);
return chunk;
},
}))
.on('data', (data) => console.log('Output:', data));
// Push data whenever it's ready
intake.pushData('Hello');
intake.pushData('World');
intake.signalEnd(); // Signal end-of-stream
Demand-driven Production with Observable
pushNextObservable emits whenever the stream is ready for more data — perfect for throttled or event-driven producers:
const intake = new StreamIntake<number>();
let counter = 0;
intake.pushNextObservable.subscribe(() => {
if (counter < 100) {
intake.pushData(counter++);
} else {
intake.signalEnd();
}
});
intake.pipe(consumer);
Creating from Existing Streams
Wrap a Node.js Readable or a Web ReadableStream:
// From Node.js Readable
const intake = await StreamIntake.fromStream<Buffer>(fs.createReadStream('./data.bin'));
// From Web ReadableStream
const response = await fetch('https://example.com/stream');
const intake = await StreamIntake.fromStream<Uint8Array>(response.body);
⚡ Utility Functions
createTransformFunction
Quickly create a SmartDuplex from a simple async mapping function:
import { createTransformFunction } from '@push.rocks/smartstream';
const doubler = createTransformFunction<number, number>(async (n) => n * 2);
intakeStream.pipe(doubler).pipe(outputStream);
createPassThrough
Create an object-mode passthrough stream (useful as an intermediary or tee point):
import { createPassThrough } from '@push.rocks/smartstream';
const passThrough = createPassThrough();
source.pipe(passThrough).pipe(destination);
🌐 WebDuplexStream — Pure Web Streams API
WebDuplexStream extends TransformStream and works in both browsers and Node.js. Import it from the /web subpath for zero Node.js dependencies.
import { WebDuplexStream } from '@push.rocks/smartstream/web';
const stream = new WebDuplexStream<number, number>({
writeFunction: async (chunk, { push }) => {
push(chunk * 2); // Push transformed data
},
});
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();
// Write
await writer.write(5);
await writer.write(10);
await writer.close();
// Read
const { value } = await reader.read(); // 10
const { value: v2 } = await reader.read(); // 20
From a Uint8Array
const stream = WebDuplexStream.fromUInt8Array(new Uint8Array([1, 2, 3]));
const reader = stream.readable.getReader();
const { value } = await reader.read(); // Uint8Array [1, 2, 3]
Data Production with readFunction
Supply data into the stream from any async source:
const stream = new WebDuplexStream<string, string>({
readFunction: async (tools) => {
await tools.write('chunk 1');
await tools.write('chunk 2');
tools.done(); // Signal end
},
writeFunction: async (chunk, { push }) => {
push(chunk.toUpperCase());
},
});
const reader = stream.readable.getReader();
// reads "CHUNK 1", "CHUNK 2"
🔀 Node ↔ Web Stream Converters
The nodewebhelpers namespace provides bidirectional converters between Node.js and Web Streams:
import { nodewebhelpers } from '@push.rocks/smartstream';
| Function | From | To |
|---|---|---|
createWebReadableStreamFromFile(path) |
File path | Web ReadableStream<Uint8Array> |
convertWebReadableToNodeReadable(webStream) |
Web ReadableStream |
Node.js Readable |
convertNodeReadableToWebReadable(nodeStream) |
Node.js Readable |
Web ReadableStream |
convertWebWritableToNodeWritable(webWritable) |
Web WritableStream |
Node.js Writable |
convertNodeWritableToWebWritable(nodeWritable) |
Node.js Writable |
Web WritableStream |
Example: Serve a File as a Web ReadableStream
const webStream = nodewebhelpers.createWebReadableStreamFromFile('./video.mp4');
// Use with fetch Response, service workers, etc.
return new Response(webStream, {
headers: { 'Content-Type': 'video/mp4' },
});
Example: Convert Between Stream Types
import fs from 'fs';
import { nodewebhelpers } from '@push.rocks/smartstream';
// Node → Web
const nodeReadable = fs.createReadStream('./data.bin');
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
// Web → Node
const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
nodeReadable2.pipe(fs.createWriteStream('./copy.bin'));
🏗️ Backpressure Handling
SmartDuplex uses a BackpressuredArray internally, bounded by highWaterMark (default: 1). When the downstream consumer is slow, the stream automatically pauses the upstream producer until space is available — no manual bookkeeping required.
const slow = new SmartDuplex({
name: 'SlowConsumer',
objectMode: true,
highWaterMark: 1,
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 200));
return chunk;
},
});
const fast = new SmartDuplex({
name: 'FastProducer',
objectMode: true,
writeFunction: async (chunk, tools) => {
return chunk; // Instant processing
},
});
// Backpressure is handled automatically between fast → slow
fast.pipe(slow).on('data', (d) => console.log(d));
for (let i = 0; i < 100; i++) {
fast.write(`chunk-${i}`);
}
fast.end();
🎯 Real-World Example: Processing Pipeline
import fs from 'fs';
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
// Read → Transform → Filter → Write
const pipeline = new StreamWrapper([
fs.createReadStream('./access.log'),
new SmartDuplex({
writeFunction: async (chunk) => {
// Parse each line
return chunk.toString().split('\n');
},
}),
new SmartDuplex({
objectMode: true,
writeFunction: async (lines: string[], tools) => {
// Filter and push matching lines
for (const line of lines) {
if (line.includes('ERROR')) {
await tools.push(line + '\n');
}
}
},
}),
fs.createWriteStream('./errors.log'),
]);
await pipeline.run();
console.log('Error extraction complete');
License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the LICENSE file.
Please note: The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
Trademarks
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.
Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.
Company Information
Task Venture Capital GmbH Registered at District Court Bremen HRB 35230 HB, Germany
For any legal inquiries or further information, please contact us via email at hello@task.vc.
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.
changelog.md for @push.rocks/smartstream
2026-03-02 - 3.4.0 - feat(smartduplex)
improve backpressure handling and web/node stream interoperability
- Refactored SmartDuplex to use synchronous _read/_write/_final (avoids async pitfalls), added internal backpressured buffer draining and consumer signaling
- Implemented pull-based backpressure for Node <-> Web stream conversions (nodewebhelpers and convertNodeReadableToWebReadable/convertWebReadableToNodeReadable)
- StreamIntake.fromStream now reads from 'readable' and drains properly; StreamWrapper resolves safely on end/close/finish
- Improved getWebStreams / WebDuplexStream behavior (safer enqueue/close/abort handling, final/readFunction improvements)
- Added many new unit tests covering backpressure, web/node helpers, StreamIntake, StreamWrapper, WebDuplexStream; bumped @push.rocks/lik and @types/node versions
2026-02-28 - 3.3.0 - feat(smartstream)
bump dependencies, update build/publish config, refactor tests, and overhaul documentation
- Upgrade devDependencies (e.g. @git.zone/tsbuild -> ^4.1.2, @git.zone/tsrun -> ^2.0.1, @git.zone/tstest -> ^3.1.8, @types/node -> ^25.3.2) and runtime deps (e.g. @push.rocks/lik -> ^6.2.2, @push.rocks/smartenv -> ^6.0.0, @push.rocks/smartpromise -> ^4.2.3, @push.rocks/smartrx -> ^3.0.10).
- Refactor tests to use Node's native fs streams instead of @push.rocks/smartfile.fsStream and export default tap.start() to support ESM test runner patterns.
- Adjust build/publish: remove --web flag from build script, add pnpm override for agentkeepalive, add tspublish.json files for publish order, and add release registries/access in npmextra.json (verdaccio + npm).
- Rework project metadata in npmextra.json (namespaced @git.zone keys, tsdoc entry changes) and minor TypeScript/web fix: cast stream/web constructors to any in ts_web/plugins.ts.
- Large README rewrite: improved installation (pnpm), clearer Node vs Web entrypoints, expanded examples, and updated legal/license wording.
2024-11-19 - 3.2.5 - fix(nodewebhelpers)
Fix import and use correct module structure for Node.js streams in smartstream.nodewebhelpers.ts
- Corrected the import statement for the fs module.
- Updated the use of the fs.createReadStream method.
2024-10-16 - 3.2.4 - fix(SmartDuplex)
Fix stream termination when reading from a web readable stream
- Resolved an issue in SmartDuplex where the stream did not properly terminate after reaching the end of a web readable stream.
2024-10-16 - 3.2.3 - fix(smartduplex)
Enhance documentation for read function in SmartDuplex
- Added inline comments to clarify the behavior and importance of unlocking the reader in the readFunction of SmartDuplex.fromWebReadableStream.
2024-10-16 - 3.2.2 - fix(SmartDuplex)
Fix issue with SmartDuplex fromWebReadableStream method
- Resolved a potential unhandled promise rejection in fromWebReadableStream method
- Ensured proper release of stream reader lock in case of read completion
2024-10-16 - 3.2.1 - fix(core)
Fix the order of operations in SmartDuplex _read method to ensure proper waiting for items.
- Adjusted the order of reading function execution and waiting for items in the SmartDuplex _read method.
- Fixed potential issues with stream data processing timing.
2024-10-16 - 3.2.0 - feat(SmartDuplex)
Added method to create SmartDuplex from a WebReadableStream.
- Implemented a static method in SmartDuplex to allow creating an instance from a WebReadableStream.
- This addition enhances the capability of SmartDuplex to integrate with web streams, facilitating seamless stream manipulation across environments.
2024-10-14 - 3.1.2 - fix(WebDuplexStream)
Fix variable naming inconsistency in WebDuplexStream test
- Changed variable names from 'transformStream' to 'webDuplexStream' for consistency.
- Renamed 'writableStream' and 'readableStream' to 'writer' and 'reader' respectively.
2024-10-13 - 3.1.1 - fix(WebDuplexStream)
Improved read/write interface and error handling in WebDuplexStream
- Enhanced the IStreamToolsRead and IStreamToolsWrite interfaces for better Promise handling
- Refined readFunction and writeFunction handling to accommodate asynchronous data processing and error propagation
- Added internal _startReading method to facilitate initial data handling if readFunction is present
- Maintained backward compatibility while ensuring data continuity when no writeFunction is specified
2024-10-13 - 3.1.0 - feat(core)
Add support for creating Web ReadableStream from a file
- Introduced a new helper function
createWebReadableStreamFromFilethat allows for creating a Web ReadableStream from a file path. - Updated exports in
ts/index.tsto includenodewebhelperswhich provides the new web stream feature.
2024-10-13 - 3.0.46 - fix(WebDuplexStream)
Fix errors in WebDuplexStream transformation and test logic
- Corrected async handling in WebDuplexStream write function
- Fixed
WebDuplexStreamtests to properly handle asynchronous reading and writing
2024-10-13 - 3.0.45 - fix(ts)
Fixed formatting issues in SmartDuplex class
- Resolved inconsistent spacing in SmartDuplex class methods and constructor.
- Ensured consistent formatting in the getWebStreams method.
2024-06-02 - 3.0.39 - smartduplex
Add .getWebStreams method
- Introduced a new
.getWebStreamsmethod in the smartduplex module, providing compatibility with the web streams API.
2024-03-16 - 3.0.34 - configuration
Update project configuration files
- Updated
tsconfigfor optimization. - Modified
npmextra.jsonto set thegithostattribute.
2023-11-03 - 3.0.0 to 3.0.8 - core
Transition to major version 3.x
- Implemented breaking changes in the core system for better performance and feature set.
- Continuous core updates to improve stability and performance across minor version increments.
2023-11-02 - 2.0.4 to 2.0.8 - core
Core updates and a major fix
- Implemented core updates addressing minor bugs and enhancements.
- A significant breaking change update transitioning from 2.0.x to 3.0.0.
2022-03-31 - 2.0.0 - core
Major esm transition
- Implemented a breaking change by switching the core to ESM (ECMAScript Module) format for modernized module handling.