← レッスン一覧に戻る レッスン 30 / 48
上級 api
Wasmストリームと非同期イテレータ
Webストリームとは?
Streams APIを使うと、ファイル全体をメモリに読み込む代わりに、データをインクリメンタルに処理できます。大容量ファイルを扱うWasmでは不可欠です — 500MBの動画をリニアメモリに一度に全部コピーしたくはありません。
┌──────────────────────────────────────────────────────────┐
│ ストリームアーキテクチャ │
│ │
│ ソース ──→ ReadableStream ──→ TransformStream ──→ シンク │
│ (生成) (変換) WritableStream │
│ (消費) │
│ │
│ ◄────────── バックプレッシャー信号が上流に流れる ────────►│
└──────────────────────────────────────────────────────────┘ストリームには3種類あります:
| ストリーム種別 | 目的 | 方向 |
|---|---|---|
ReadableStream |
データチャンクを生成 | ソース → コンシューマ |
WritableStream |
データチャンクを消費 | プロデューサ → シンク |
TransformStream |
データを転送中に変換 | 入力 → 出力 |
wasm-streamsクレート
wasm-streamsクレートはWeb Streams APIのRustバインディングを提供し、JSストリームをfuturesクレートのRust Stream/Sink型に変換します。
[dependencies]
wasm-streams = "0.4"
futures = "0.3"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3", features = ["ReadableStream", "WritableStream"] }Fetchレスポンスをストリームとして読む
最も一般的なユースケースはfetch()レスポンスのストリーミングです:
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use wasm_streams::ReadableStream;
use futures::StreamExt;
#[wasm_bindgen]
pub async fn stream_fetch(url: &str) -> Result<(), JsValue> {
let window = web_sys::window().unwrap();
let resp: web_sys::Response = JsFuture::from(window.fetch_with_str(url))
.await?
.dyn_into()?;
// レスポンスボディからReadableStreamを取得
let raw_body = resp.body().unwrap();
let body = ReadableStream::from_raw(raw_body);
// RustのStreamに変換
let mut reader = body.into_stream();
let mut total_bytes = 0u64;
while let Some(chunk) = reader.next().await {
let chunk = chunk?;
let array: js_sys::Uint8Array = chunk.dyn_into()?;
total_bytes += array.length() as u64;
// ここでチャンクを処理...
}
web_sys::console::log_1(
&format!("{}バイトをストリーミング", total_bytes).into()
);
Ok(())
}バックプレッシャー
バックプレッシャーは、高速なプロデューサが低速なコンシューマを圧倒するのを防ぐ仕組みです。これがないと、メモリ内に無制限にデータがバッファリングされます。
高速プロデューサ 低速コンシューマ
│ │
│──── チャンク1 ───────►│ (処理中...)
│──── チャンク2 ───────►│ (まだチャンク1を処理中)
│──── チャンク3 ──► 待機│ ← バックプレッシャー信号
│ (一時停止) │
│◄── 準備完了 ─────────│ (チャンク1の処理完了)
│──── チャンク3 ───────►│ (再開)Streams APIはハイウォーターマークとキューサイズを通じてバックプレッシャーを自動的に処理します:
// wasm-streamsでは、バックプレッシャーは透過的に処理される:
let mut writer = writable_stream.into_sink();
// 内部キューが満杯の場合、自動的に待機する
writer.send(chunk).await?; // バックプレッシャー時にサスペンドする可能性ありハイウォーターマークの設定
// JavaScript側 — キューイング戦略の設定
const stream = new ReadableStream({
start(controller) { /* ... */ },
pull(controller) { /* ... */ },
}, {
highWaterMark: 3, // バックプレッシャー前に最大3チャンクをバッファ
});TransformStream:データの転送中処理
TransformStreamはReadableStreamとWritableStreamの間に位置し、通過するデータを変換します:
use wasm_streams::TransformStream;
#[wasm_bindgen]
pub fn create_uppercase_transform() -> web_sys::TransformStream {
let transform = TransformStream::new(
// 変換関数
|chunk: JsValue, controller: &TransformStreamDefaultController| {
let input: js_sys::Uint8Array = chunk.dyn_into().unwrap();
let mut data = vec![0u8; input.length() as usize];
input.copy_to(&mut data);
// 変換: ASCIIを大文字に
for byte in &mut data {
if *byte >= b'a' && *byte <= b'z' {
*byte -= 32;
}
}
let output = js_sys::Uint8Array::from(&data[..]);
controller.enqueue(&output).unwrap();
Ok(())
},
);
transform.into_raw()
}ストリームのパイプ接続
ストリームをパイプラインとして連結できます:
#[wasm_bindgen]
pub async fn process_file(input: web_sys::ReadableStream) -> Result<(), JsValue> {
let readable = ReadableStream::from_raw(input);
let mut stream = readable.into_stream();
let mut processed_bytes = 0u64;
let mut chunk_count = 0u32;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
let array: js_sys::Uint8Array = chunk.dyn_into()?;
let len = array.length() as usize;
// Wasmリニアメモリ内で処理
let mut data = vec![0u8; len];
array.copy_to(&mut data);
// 例: チェックサムの計算
let checksum: u32 = data.iter().map(|&b| b as u32).sum();
processed_bytes += len as u64;
chunk_count += 1;
}
Ok(())
}大容量ファイルのストリーミング:完全な例
┌─────────┐ ┌───────────┐ ┌──────────────┐ ┌────────┐
│ ファイル │───►│ Readable │───►│ Transform │───►│ DBに │
│ 入力 │ │ Stream │ │ (Wasm内) │ │ 書き込み│
│ (ディスク)│ │ 64KB │ │ 圧縮/ │ │ │
│ │ │ チャンク │ │ 暗号化 │ │ │
└─────────┘ └───────────┘ └──────────────┘ └────────┘
│
同時にメモリ内にあるのは
1〜3チャンクだけ!// JavaScript側: パイプラインの接続
const fileStream = file.stream(); // File APIからのReadableStream
const transform = wasm.create_compression_transform();
const compressed = fileStream.pipeThrough(transform);
const writer = getStorageWriter();
await compressed.pipeTo(writer);Wasm向けRustの非同期イテレータ
RustのStreamトレイト(futuresクレート)はIteratorの非同期版です:
use futures::stream::{self, StreamExt};
#[wasm_bindgen]
pub async fn process_items() {
let items = stream::iter(vec![1, 2, 3, 4, 5]);
items
.map(|x| x * 2)
.filter(|x| futures::future::ready(*x > 4))
.for_each(|x| async move {
web_sys::console::log_1(&format!("Item: {}", x).into());
})
.await;
}メモリに関する考慮事項
| アプローチ | メモリ使用量 | レイテンシ |
|---|---|---|
| ファイル全体を読み込み | O(ファイルサイズ) | 高い(全ダウンロード完了を待つ) |
| 64KBチャンクでストリーミング | O(64KB * バッファ数) | 低い(データ到着時に処理) |
| バックプレッシャー付きストリーミング | O(ハイウォーターマーク) | 適応的 |
100MBのファイルを64KBチャンクで処理する場合:
- ストリーミングなし: Wasmリニアメモリに100MB
- ストリーミングあり: Wasmリニアメモリに約192KB(3チャンクバッファ)
まとめ
- Webストリームはデータをインクリメンタルに処理でき、大きなメモリ割り当てを回避する
- wasm-streamsはJS Streams APIとRustの
futures::Streamおよびfutures::Sinkを橋渡しする - バックプレッシャーは高速プロデューサが低速コンシューマを圧倒するのを防ぐ — Streams APIが自動的に処理する
- TransformStreamはWasmが入力全体をバッファリングせずに転送中のデータを処理できる
- 64KBチャンクは良いデフォルトチャンクサイズ — Wasmページサイズと一致し、スループットとメモリのバランスが取れている
- Wasmアプリケーションでは数MB以上のファイルには常にストリーミングを使用すること