Workers AI ストリーミング SSE プロキシ
Workers AI のストリームを SSE に変換し、クライアント切断時に中断し、ランタイムのキルスイッチでガードする
概要
このレシピでは、Workers AI のチャットモデルをプロキシし、そのレスポンスを
Server-Sent Events (SSE) としてブラウザにストリーミングする Worker エンドポイントを
構築します。単に「env.AI.run を呼ぶだけ」のサンプルでは省かれがちな、3 つの本番運用上の
課題を扱います。
- Workers AI の
ReadableStreamをtext/event-streamレスポンスに変換する。 - クライアントが切断したら処理中の AI リクエストを中断し、閉じられたタブが Workers AI の クォータを消費し続けないようにする。
- ランタイムのキルスイッチ(ダッシュボードで設定する単なる環境変数)で、再デプロイなしに
503を返す。
本記事は zudo-text 同期サーバの POST /api/v1/chat ハンドラを下敷きにしています。
AI モデルのバインディング
Workers AI は wrangler.toml の [ai] バインディング経由で公開されます。
[ai]
binding = "AI"
これにより Worker 内で env.AI が使えるようになります。
⚠️ 名前付き環境は [ai] を継承しません
トップレベルの [ai] バインディングは、名前付き環境には伝播しません。--env staging で
デプロイする場合、その環境のもとでバインディングを再度書かないと、その環境の env.AI は
undefined になります。
[ai]
binding = "AI"
[env.staging.ai]
binding = "AI"すべての名前付き環境に、それぞれ専用の [env.<name>.ai] ブロックが必要です。
ストリーミングでモデルを呼び出す
モデル ID は @cf/... 形式で渡し、stream: true を指定します。ストリーミングを有効に
すると、run() は完成した JSON オブジェクトではなく ReadableStream を返します。
const KIMI_MODEL = "@cf/moonshotai/kimi-k2.6";
const kimiStream = await env.AI.run(KIMI_MODEL, {
messages,
stream: true,
});
このストリームは SSE 形式のフレーム、すなわち data: {...}\n\n という行を送出し、最後に
終端の data: [DONE]\n\n で終わります。
AI ストリームを SSE に変換する
Workers AI のストリームをそのまま素通しすることはできません。フレームをパースし、自前の
SSE イベントとして再送出することで、ブラウザがきれいな text/event-stream を受け取れる
ようにします。下記のイベント名(content_block_delta、message_stop、done、error)は
このアプリ独自の取り決めです。フロントエンドのコンシューマが期待する任意の名前を使って
かまいません。
const encoder = new TextEncoder();
function encodeSseEvent(event: string, data: unknown): Uint8Array {
const payload = typeof data === "string" ? data : JSON.stringify(data);
return encoder.encode(`event: ${event}\ndata: ${payload}\n\n`);
}
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const send = (event: string, data: unknown) => {
controller.enqueue(encodeSseEvent(event, data));
};
const kimiStream = await env.AI.run(KIMI_MODEL, { messages, stream: true });
const reader = kimiStream.getReader();
const dec = new TextDecoder();
let buf = "";
let index = 0;
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += dec.decode(value, { stream: true });
// Split on SSE frame boundaries (double newline).
const parts = buf.split("\n\n");
buf = parts.pop() ?? "";
for (const part of parts) {
const line = part.replace(/^data:\s*/m, "").trim();
if (!line || line === "[DONE]") continue;
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
continue; // malformed chunk — skip
}
// Workers AI chat-completions shape:
// { choices: [{ delta: { content: "..." } }] }
// Older models use { response: "..." } — accept both.
let text: string | null = null;
if (parsed && typeof parsed === "object" && "choices" in parsed) {
const choices = (parsed as { choices: Array<{ delta?: { content?: string } }> }).choices;
text = choices?.[0]?.delta?.content ?? null;
} else if (parsed && typeof parsed === "object" && "response" in parsed) {
const r = (parsed as { response: unknown }).response;
text = typeof r === "string" ? r : null;
}
if (text !== null && text !== "") {
send("content_block_delta", { index, text });
index++;
}
}
}
send("message_stop", {});
send("done", {});
controller.close();
},
});
return new Response(stream, {
status: 200,
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
},
});
📝 no-transform を付ける理由
Cache-Control: no-cache, no-transform を指定すると、Cloudflare がストリームを
バッファリングしたり書き換えたりしなくなります。これがないと中間装置がチャンクを溜め込み、
「ストリーミングらしさ」が失われることがあります。
クライアント切断時に中断する
ストリーミングの途中でユーザーがタブを閉じた場合、処理中の AI リクエストは最後まで走らせて 誰も読まないレスポンスにクォータを請求するのではなく、すぐに止めたいはずです。補完的な 2 つのフックがあり、両方を使うべきです。
- 受信リクエストの
signalはクライアントがいなくなると発火します。これをリッスンします。 ReadableStreamのcancel()コールバックは、プラットフォームが送出側レスポンスを 破棄したときに発火します。これが実際に破棄を保証する側です。
両方を 1 つの AbortController に束ね、その signal を読み取りループ内でチェックします。
const aborter = new AbortController();
// (1) Client request signal → abort.
const clientSignal: AbortSignal | undefined = req.signal;
const onClientAbort = () => aborter.abort();
clientSignal?.addEventListener("abort", onClientAbort, { once: true });
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const send = (event: string, data: unknown) => {
if (aborter.signal.aborted) return;
controller.enqueue(encodeSseEvent(event, data));
};
const reader = kimiStream.getReader();
while (true) {
if (aborter.signal.aborted) break; // stop reading from Workers AI
const { value, done } = await reader.read();
if (done) break;
// ...parse and send as above...
}
clientSignal?.removeEventListener("abort", onClientAbort);
controller.close();
},
// (2) Response torn down by the platform → abort.
cancel() {
aborter.abort();
},
});
aborter が発火すると、次の if (aborter.signal.aborted) break でループを抜け、reader が
解放されるため、Workers AI からそれ以上フレームが引き出されることはありません。
ランタイムのキルスイッチ
コストの急増、不正利用、あるいはモデルの不調など、エンドポイントを今すぐ止めたい場面が あります。デプロイを出さずにそれを実現するのが、リクエスト時に読み取る単なる環境変数です。 これがサーキットブレーカーになります。
// Kill switch — short-circuit before any heavy work.
// "1" = disabled; any other value (including unset) = enabled.
if (env.AI_CHAT_DISABLED === "1") {
return Response.json({ error: "ai_unavailable" }, { status: 503 });
}
Cloudflare ダッシュボードの Settings → Variables and Secrets で設定します。
AI_CHAT_DISABLED = "1"
リクエストごとに読み取られるため、ダッシュボードで切り替えると次のリクエストから反映され、
再デプロイは不要です。チェックがちょうど === "1" である点に注意してください。未設定や
その他の値ではエンドポイントは有効なままなので、デフォルト(変数なし)は「オン」です。
💡 wrangler.toml に記しておく
値自体はダッシュボードに置かれますが、wrangler.toml にコメントを残しておくと、次に読む人が
この変数の存在と "1" の意味を把握できます。
# Kill switch for the chat endpoint.
# Set AI_CHAT_DISABLED = "1" in the Cloudflare dashboard (Settings →
# Variables and Secrets) to return 503 without a redeploy. Unset or any
# other value leaves the endpoint enabled.
# [vars]
# AI_CHAT_DISABLED = "0"