上級 api

Wasmストリームと非同期イテレータ

Webストリームとは?

Streams APIを使うと、ファイル全体をメモリに読み込む代わりに、データをインクリメンタルに処理できます。大容量ファイルを扱うWasmでは不可欠です — 500MBの動画をリニアメモリに一度に全部コピーしたくはありません。

┌──────────────────────────────────────────────────────────┐
│                  ストリームアーキテクチャ                   │
│                                                          │
│  ソース ──→ ReadableStream ──→ TransformStream ──→ シンク │
│             (生成)           (変換)       WritableStream │
│                                               (消費)     │
│                                                          │
│  ◄────────── バックプレッシャー信号が上流に流れる ────────►│
└──────────────────────────────────────────────────────────┘

ストリームには3種類あります:

ストリーム種別 目的 方向
ReadableStream データチャンクを生成 ソース → コンシューマ
WritableStream データチャンクを消費 プロデューサ → シンク
TransformStream データを転送中に変換 入力 → 出力

wasm-streamsクレート

wasm-streamsクレートはWeb Streams APIのRustバインディングを提供し、JSストリームをfuturesクレートのRust Stream/Sink型に変換します。

[dependencies]
wasm-streams = "0.4"
futures = "0.3"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3", features = ["ReadableStream", "WritableStream"] }

Fetchレスポンスをストリームとして読む

最も一般的なユースケースはfetch()レスポンスのストリーミングです:

use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use wasm_streams::ReadableStream;
use futures::StreamExt;

#[wasm_bindgen]
pub async fn stream_fetch(url: &str) -> Result<(), JsValue> {
    let window = web_sys::window().unwrap();
    let resp: web_sys::Response = JsFuture::from(window.fetch_with_str(url))
        .await?
        .dyn_into()?;

    // レスポンスボディからReadableStreamを取得
    let raw_body = resp.body().unwrap();
    let body = ReadableStream::from_raw(raw_body);

    // RustのStreamに変換
    let mut reader = body.into_stream();

    let mut total_bytes = 0u64;
    while let Some(chunk) = reader.next().await {
        let chunk = chunk?;
        let array: js_sys::Uint8Array = chunk.dyn_into()?;
        total_bytes += array.length() as u64;
        // ここでチャンクを処理...
    }

    web_sys::console::log_1(
        &format!("{}バイトをストリーミング", total_bytes).into()
    );
    Ok(())
}

バックプレッシャー

バックプレッシャーは、高速なプロデューサが低速なコンシューマを圧倒するのを防ぐ仕組みです。これがないと、メモリ内に無制限にデータがバッファリングされます。

高速プロデューサ         低速コンシューマ
     │                      │
     │──── チャンク1 ───────►│ (処理中...)
     │──── チャンク2 ───────►│ (まだチャンク1を処理中)
     │──── チャンク3 ──► 待機│ ← バックプレッシャー信号
     │     (一時停止)       │
     │◄── 準備完了 ─────────│ (チャンク1の処理完了)
     │──── チャンク3 ───────►│ (再開)

Streams APIはハイウォーターマークキューサイズを通じてバックプレッシャーを自動的に処理します:

// wasm-streamsでは、バックプレッシャーは透過的に処理される:
let mut writer = writable_stream.into_sink();

// 内部キューが満杯の場合、自動的に待機する
writer.send(chunk).await?;  // バックプレッシャー時にサスペンドする可能性あり

ハイウォーターマークの設定

// JavaScript側 — キューイング戦略の設定
const stream = new ReadableStream({
    start(controller) { /* ... */ },
    pull(controller) { /* ... */ },
}, {
    highWaterMark: 3,  // バックプレッシャー前に最大3チャンクをバッファ
});

TransformStream:データの転送中処理

TransformStreamはReadableStreamとWritableStreamの間に位置し、通過するデータを変換します:

use wasm_streams::TransformStream;

#[wasm_bindgen]
pub fn create_uppercase_transform() -> web_sys::TransformStream {
    let transform = TransformStream::new(
        // 変換関数
        |chunk: JsValue, controller: &TransformStreamDefaultController| {
            let input: js_sys::Uint8Array = chunk.dyn_into().unwrap();
            let mut data = vec![0u8; input.length() as usize];
            input.copy_to(&mut data);

            // 変換: ASCIIを大文字に
            for byte in &mut data {
                if *byte >= b'a' && *byte <= b'z' {
                    *byte -= 32;
                }
            }

            let output = js_sys::Uint8Array::from(&data[..]);
            controller.enqueue(&output).unwrap();
            Ok(())
        },
    );

    transform.into_raw()
}

ストリームのパイプ接続

ストリームをパイプラインとして連結できます:

#[wasm_bindgen]
pub async fn process_file(input: web_sys::ReadableStream) -> Result<(), JsValue> {
    let readable = ReadableStream::from_raw(input);
    let mut stream = readable.into_stream();

    let mut processed_bytes = 0u64;
    let mut chunk_count = 0u32;

    while let Some(chunk) = stream.next().await {
        let chunk = chunk?;
        let array: js_sys::Uint8Array = chunk.dyn_into()?;
        let len = array.length() as usize;

        // Wasmリニアメモリ内で処理
        let mut data = vec![0u8; len];
        array.copy_to(&mut data);

        // 例: チェックサムの計算
        let checksum: u32 = data.iter().map(|&b| b as u32).sum();

        processed_bytes += len as u64;
        chunk_count += 1;
    }

    Ok(())
}

大容量ファイルのストリーミング:完全な例

┌─────────┐    ┌───────────┐    ┌──────────────┐    ┌────────┐
│  ファイル │───►│ Readable  │───►│  Transform   │───►│ DBに   │
│  入力     │    │  Stream   │    │  (Wasm内)   │    │ 書き込み│
│ (ディスク)│    │ 64KB      │    │  圧縮/       │    │        │
│          │    │ チャンク   │    │  暗号化       │    │        │
└─────────┘    └───────────┘    └──────────────┘    └────────┘
                                       │
                              同時にメモリ内にあるのは
                              13チャンクだけ!
// JavaScript側: パイプラインの接続
const fileStream = file.stream();  // File APIからのReadableStream
const transform = wasm.create_compression_transform();

const compressed = fileStream.pipeThrough(transform);
const writer = getStorageWriter();
await compressed.pipeTo(writer);

Wasm向けRustの非同期イテレータ

RustのStreamトレイトfuturesクレート)はIteratorの非同期版です:

use futures::stream::{self, StreamExt};

#[wasm_bindgen]
pub async fn process_items() {
    let items = stream::iter(vec![1, 2, 3, 4, 5]);

    items
        .map(|x| x * 2)
        .filter(|x| futures::future::ready(*x > 4))
        .for_each(|x| async move {
            web_sys::console::log_1(&format!("Item: {}", x).into());
        })
        .await;
}

メモリに関する考慮事項

アプローチ メモリ使用量 レイテンシ
ファイル全体を読み込み O(ファイルサイズ) 高い(全ダウンロード完了を待つ)
64KBチャンクでストリーミング O(64KB * バッファ数) 低い(データ到着時に処理)
バックプレッシャー付きストリーミング O(ハイウォーターマーク) 適応的

100MBのファイルを64KBチャンクで処理する場合:

  • ストリーミングなし: Wasmリニアメモリに100MB
  • ストリーミングあり: Wasmリニアメモリに約192KB(3チャンクバッファ)

まとめ

  1. Webストリームはデータをインクリメンタルに処理でき、大きなメモリ割り当てを回避する
  2. wasm-streamsはJS Streams APIとRustのfutures::Streamおよびfutures::Sinkを橋渡しする
  3. バックプレッシャーは高速プロデューサが低速コンシューマを圧倒するのを防ぐ — Streams APIが自動的に処理する
  4. TransformStreamはWasmが入力全体をバッファリングせずに転送中のデータを処理できる
  5. 64KBチャンクは良いデフォルトチャンクサイズ — Wasmページサイズと一致し、スループットとメモリのバランスが取れている
  6. Wasmアプリケーションでは数MB以上のファイルには常にストリーミングを使用すること

試してみる