← Back to Lessons Lesson 30 of 48
Advanced api

Wasm Streams & Async Iterators

What Are Web Streams?

The Streams API lets you process data incrementally instead of loading entire files into memory. This is essential for Wasm when handling large files — you don't want to copy a 500 MB video into linear memory all at once.

┌──────────────────────────────────────────────────────────┐
│                  Streams Architecture                     │
│                                                          │
│  Source ──→ ReadableStream ──→ TransformStream ──→ Sink  │
│             (produces)         (modifies)     WritableStream │
│                                               (consumes)     │
│                                                          │
│  ◄────────── Backpressure signal flows upstream ────────►│
└──────────────────────────────────────────────────────────┘

There are three types of streams:

Stream Type Purpose Direction
ReadableStream Produces data chunks Source → Consumer
WritableStream Consumes data chunks Producer → Sink
TransformStream Modifies data in-flight Input → Output

The wasm-streams Crate

The wasm-streams crate provides Rust bindings for the Web Streams API, converting JS streams into Rust Stream/Sink types from the futures crate.

[dependencies]
wasm-streams = "0.4"
futures = "0.3"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3", features = ["ReadableStream", "WritableStream"] }

Reading a Fetch Response as a Stream

The most common use case is streaming a fetch() response:

use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use wasm_streams::ReadableStream;
use futures::StreamExt;

#[wasm_bindgen]
pub async fn stream_fetch(url: &str) -> Result<(), JsValue> {
    let window = web_sys::window().unwrap();
    let resp: web_sys::Response = JsFuture::from(window.fetch_with_str(url))
        .await?
        .dyn_into()?;

    // Get the ReadableStream from the response body
    let raw_body = resp.body().unwrap();
    let body = ReadableStream::from_raw(raw_body);

    // Convert to a Rust Stream
    let mut reader = body.into_stream();

    let mut total_bytes = 0u64;
    while let Some(chunk) = reader.next().await {
        let chunk = chunk?;
        let array: js_sys::Uint8Array = chunk.dyn_into()?;
        total_bytes += array.length() as u64;
        // Process chunk here...
    }

    web_sys::console::log_1(
        &format!("Streamed {} bytes", total_bytes).into()
    );
    Ok(())
}

Backpressure

Backpressure is the mechanism that prevents a fast producer from overwhelming a slow consumer. Without it, you'd buffer unbounded data in memory.

Fast Producer          Slow Consumer
     │                      │
     │──── chunk 1 ────────►│ (processing...)
     │──── chunk 2 ────────►│ (still processing chunk 1)
     │──── chunk 3 ──► WAIT │ ← backpressure signal
     │     (paused)         │
     │◄── ready ────────────│ (done with chunk 1)
     │──── chunk 3 ────────►│ (resume)

The Streams API handles backpressure automatically via the high-water mark and queue size:

// In wasm-streams, backpressure is handled transparently:
let mut writer = writable_stream.into_sink();

// This will automatically wait if the internal queue is full
writer.send(chunk).await?;  // may suspend if backpressured

High-Water Mark Configuration

// JavaScript side — configuring the queuing strategy
const stream = new ReadableStream({
    start(controller) { /* ... */ },
    pull(controller) { /* ... */ },
}, {
    highWaterMark: 3,  // Buffer up to 3 chunks before backpressure
});

TransformStream: Processing Data In-Flight

A TransformStream sits between a readable and writable stream, modifying data as it passes through:

use wasm_streams::TransformStream;

#[wasm_bindgen]
pub fn create_uppercase_transform() -> web_sys::TransformStream {
    let transform = TransformStream::new(
        // transform function
        |chunk: JsValue, controller: &TransformStreamDefaultController| {
            let input: js_sys::Uint8Array = chunk.dyn_into().unwrap();
            let mut data = vec![0u8; input.length() as usize];
            input.copy_to(&mut data);

            // Transform: uppercase ASCII
            for byte in &mut data {
                if *byte >= b'a' && *byte <= b'z' {
                    *byte -= 32;
                }
            }

            let output = js_sys::Uint8Array::from(&data[..]);
            controller.enqueue(&output).unwrap();
            Ok(())
        },
    );

    transform.into_raw()
}

Piping Streams Together

You can chain streams into a pipeline:

#[wasm_bindgen]
pub async fn process_file(input: web_sys::ReadableStream) -> Result<(), JsValue> {
    let readable = ReadableStream::from_raw(input);
    let mut stream = readable.into_stream();

    let mut processed_bytes = 0u64;
    let mut chunk_count = 0u32;

    while let Some(chunk) = stream.next().await {
        let chunk = chunk?;
        let array: js_sys::Uint8Array = chunk.dyn_into()?;
        let len = array.length() as usize;

        // Process in Wasm linear memory
        let mut data = vec![0u8; len];
        array.copy_to(&mut data);

        // Example: compute checksum
        let checksum: u32 = data.iter().map(|&b| b as u32).sum();

        processed_bytes += len as u64;
        chunk_count += 1;
    }

    Ok(())
}

Streaming Large Files: A Complete Example

┌─────────┐    ┌───────────┐    ┌──────────────┐    ┌────────┐
│  File    │───►│ Readable  │───►│  Transform   │───►│ Write  │
│  Input   │    │  Stream   │    │  (in Wasm)   │    │ to DB  │
│  (disk)  │    │ 64KB      │    │  compress/   │    │        │
│          │    │ chunks    │    │  encrypt     │    │        │
└─────────┘    └───────────┘    └──────────────┘    └────────┘
                                       │
                              Only 1-3 chunks in
                              memory at a time!
// JavaScript side: wire up the pipeline
const fileStream = file.stream();  // ReadableStream from File API
const transform = wasm.create_compression_transform();

const compressed = fileStream.pipeThrough(transform);
const writer = getStorageWriter();
await compressed.pipeTo(writer);

Async Iterators in Rust for Wasm

Rust's Stream trait (from futures) is the async equivalent of Iterator:

use futures::stream::{self, StreamExt};

#[wasm_bindgen]
pub async fn process_items() {
    let items = stream::iter(vec![1, 2, 3, 4, 5]);

    items
        .map(|x| x * 2)
        .filter(|x| futures::future::ready(*x > 4))
        .for_each(|x| async move {
            web_sys::console::log_1(&format!("Item: {}", x).into());
        })
        .await;
}

Memory Considerations

Approach Memory Usage Latency
Load entire file O(file_size) High (wait for full download)
Stream with 64KB chunks O(64KB * buffer_count) Low (process as data arrives)
Stream with backpressure O(high_water_mark) Adaptive

For a 100 MB file with 64 KB chunks:

  • Without streaming: 100 MB in Wasm linear memory
  • With streaming: ~192 KB (3 buffered chunks) in Wasm linear memory

Key Takeaways

  1. Web Streams let you process data incrementally, avoiding large memory allocations
  2. wasm-streams bridges the JS Streams API with Rust's futures::Stream and futures::Sink
  3. Backpressure prevents fast producers from overwhelming slow consumers — the Streams API handles it automatically
  4. TransformStream lets Wasm process data in-flight without buffering the entire input
  5. 64 KB chunks are a good default chunk size — they match Wasm page size and balance throughput vs memory
  6. Always use streaming for files larger than a few MB in Wasm applications

Try It