Wasm Streams & Async Iterators
What Are Web Streams?
The Streams API lets you process data incrementally instead of loading entire files into memory. This is essential for Wasm when handling large files — you don't want to copy a 500 MB video into linear memory all at once.
┌──────────────────────────────────────────────────────────┐
│ Streams Architecture │
│ │
│ Source ──→ ReadableStream ──→ TransformStream ──→ Sink │
│ (produces) (modifies) WritableStream │
│ (consumes) │
│ │
│ ◄────────── Backpressure signal flows upstream ────────►│
└──────────────────────────────────────────────────────────┘There are three types of streams:
| Stream Type | Purpose | Direction |
|---|---|---|
ReadableStream |
Produces data chunks | Source → Consumer |
WritableStream |
Consumes data chunks | Producer → Sink |
TransformStream |
Modifies data in-flight | Input → Output |
The wasm-streams Crate
The wasm-streams crate provides Rust bindings for the Web Streams API, converting JS streams into Rust Stream/Sink types from the futures crate.
[dependencies]
wasm-streams = "0.4"
futures = "0.3"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3", features = ["ReadableStream", "WritableStream"] }Reading a Fetch Response as a Stream
The most common use case is streaming a fetch() response:
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use wasm_streams::ReadableStream;
use futures::StreamExt;
#[wasm_bindgen]
pub async fn stream_fetch(url: &str) -> Result<(), JsValue> {
let window = web_sys::window().unwrap();
let resp: web_sys::Response = JsFuture::from(window.fetch_with_str(url))
.await?
.dyn_into()?;
// Get the ReadableStream from the response body
let raw_body = resp.body().unwrap();
let body = ReadableStream::from_raw(raw_body);
// Convert to a Rust Stream
let mut reader = body.into_stream();
let mut total_bytes = 0u64;
while let Some(chunk) = reader.next().await {
let chunk = chunk?;
let array: js_sys::Uint8Array = chunk.dyn_into()?;
total_bytes += array.length() as u64;
// Process chunk here...
}
web_sys::console::log_1(
&format!("Streamed {} bytes", total_bytes).into()
);
Ok(())
}Backpressure
Backpressure is the mechanism that prevents a fast producer from overwhelming a slow consumer. Without it, you'd buffer unbounded data in memory.
Fast Producer Slow Consumer
│ │
│──── chunk 1 ────────►│ (processing...)
│──── chunk 2 ────────►│ (still processing chunk 1)
│──── chunk 3 ──► WAIT │ ← backpressure signal
│ (paused) │
│◄── ready ────────────│ (done with chunk 1)
│──── chunk 3 ────────►│ (resume)The Streams API handles backpressure automatically via the high-water mark and queue size:
// In wasm-streams, backpressure is handled transparently:
let mut writer = writable_stream.into_sink();
// This will automatically wait if the internal queue is full
writer.send(chunk).await?; // may suspend if backpressuredHigh-Water Mark Configuration
// JavaScript side — configuring the queuing strategy
const stream = new ReadableStream({
start(controller) { /* ... */ },
pull(controller) { /* ... */ },
}, {
highWaterMark: 3, // Buffer up to 3 chunks before backpressure
});TransformStream: Processing Data In-Flight
A TransformStream sits between a readable and writable stream, modifying data as it passes through:
use wasm_streams::TransformStream;
#[wasm_bindgen]
pub fn create_uppercase_transform() -> web_sys::TransformStream {
let transform = TransformStream::new(
// transform function
|chunk: JsValue, controller: &TransformStreamDefaultController| {
let input: js_sys::Uint8Array = chunk.dyn_into().unwrap();
let mut data = vec![0u8; input.length() as usize];
input.copy_to(&mut data);
// Transform: uppercase ASCII
for byte in &mut data {
if *byte >= b'a' && *byte <= b'z' {
*byte -= 32;
}
}
let output = js_sys::Uint8Array::from(&data[..]);
controller.enqueue(&output).unwrap();
Ok(())
},
);
transform.into_raw()
}Piping Streams Together
You can chain streams into a pipeline:
#[wasm_bindgen]
pub async fn process_file(input: web_sys::ReadableStream) -> Result<(), JsValue> {
let readable = ReadableStream::from_raw(input);
let mut stream = readable.into_stream();
let mut processed_bytes = 0u64;
let mut chunk_count = 0u32;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
let array: js_sys::Uint8Array = chunk.dyn_into()?;
let len = array.length() as usize;
// Process in Wasm linear memory
let mut data = vec![0u8; len];
array.copy_to(&mut data);
// Example: compute checksum
let checksum: u32 = data.iter().map(|&b| b as u32).sum();
processed_bytes += len as u64;
chunk_count += 1;
}
Ok(())
}Streaming Large Files: A Complete Example
┌─────────┐ ┌───────────┐ ┌──────────────┐ ┌────────┐
│ File │───►│ Readable │───►│ Transform │───►│ Write │
│ Input │ │ Stream │ │ (in Wasm) │ │ to DB │
│ (disk) │ │ 64KB │ │ compress/ │ │ │
│ │ │ chunks │ │ encrypt │ │ │
└─────────┘ └───────────┘ └──────────────┘ └────────┘
│
Only 1-3 chunks in
memory at a time!// JavaScript side: wire up the pipeline
const fileStream = file.stream(); // ReadableStream from File API
const transform = wasm.create_compression_transform();
const compressed = fileStream.pipeThrough(transform);
const writer = getStorageWriter();
await compressed.pipeTo(writer);Async Iterators in Rust for Wasm
Rust's Stream trait (from futures) is the async equivalent of Iterator:
use futures::stream::{self, StreamExt};
#[wasm_bindgen]
pub async fn process_items() {
let items = stream::iter(vec![1, 2, 3, 4, 5]);
items
.map(|x| x * 2)
.filter(|x| futures::future::ready(*x > 4))
.for_each(|x| async move {
web_sys::console::log_1(&format!("Item: {}", x).into());
})
.await;
}Memory Considerations
| Approach | Memory Usage | Latency |
|---|---|---|
| Load entire file | O(file_size) | High (wait for full download) |
| Stream with 64KB chunks | O(64KB * buffer_count) | Low (process as data arrives) |
| Stream with backpressure | O(high_water_mark) | Adaptive |
For a 100 MB file with 64 KB chunks:
- Without streaming: 100 MB in Wasm linear memory
- With streaming: ~192 KB (3 buffered chunks) in Wasm linear memory
Key Takeaways
- Web Streams let you process data incrementally, avoiding large memory allocations
- wasm-streams bridges the JS Streams API with Rust's
futures::Streamandfutures::Sink - Backpressure prevents fast producers from overwhelming slow consumers — the Streams API handles it automatically
- TransformStream lets Wasm process data in-flight without buffering the entire input
- 64 KB chunks are a good default chunk size — they match Wasm page size and balance throughput vs memory
- Always use streaming for files larger than a few MB in Wasm applications