zudo-cloudflare-wisdom

Type to search...

to open search from anywhere

Workers AI ストリーミング SSE プロキシ

作成2026年5月28日Takeshi Takatsudo

Workers AI のストリームを SSE に変換し、クライアント切断時に中断し、ランタイムのキルスイッチでガードする

概要

このレシピでは、Workers AI のチャットモデルをプロキシし、そのレスポンスを Server-Sent Events (SSE) としてブラウザにストリーミングする Worker エンドポイントを 構築します。単に「env.AI.run を呼ぶだけ」のサンプルでは省かれがちな、3 つの本番運用上の 課題を扱います。

  • Workers AI の ReadableStreamtext/event-stream レスポンスに変換する。
  • クライアントが切断したら処理中の AI リクエストを中断し、閉じられたタブが Workers AI の クォータを消費し続けないようにする。
  • ランタイムのキルスイッチ(ダッシュボードで設定する単なる環境変数)で、再デプロイなしに 503 を返す。

本記事は zudo-text 同期サーバの POST /api/v1/chat ハンドラを下敷きにしています。

AI モデルのバインディング

Workers AI は wrangler.toml[ai] バインディング経由で公開されます。

[ai]
binding = "AI"

これにより Worker 内で env.AI が使えるようになります。

⚠️ 名前付き環境は [ai] を継承しません

トップレベルの [ai] バインディングは、名前付き環境には伝播しません。--env staging で デプロイする場合、その環境のもとでバインディングを再度書かないと、その環境の env.AIundefined になります。

[ai]
binding = "AI"

[env.staging.ai]
binding = "AI"

すべての名前付き環境に、それぞれ専用の [env.<name>.ai] ブロックが必要です。

ストリーミングでモデルを呼び出す

モデル ID は @cf/... 形式で渡し、stream: true を指定します。ストリーミングを有効に すると、run() は完成した JSON オブジェクトではなく ReadableStream を返します。

const KIMI_MODEL = "@cf/moonshotai/kimi-k2.6";

const kimiStream = await env.AI.run(KIMI_MODEL, {
  messages,
  stream: true,
});

このストリームは SSE 形式のフレーム、すなわち data: {...}\n\n という行を送出し、最後に 終端の data: [DONE]\n\n で終わります。

AI ストリームを SSE に変換する

Workers AI のストリームをそのまま素通しすることはできません。フレームをパースし、自前の SSE イベントとして再送出することで、ブラウザがきれいな text/event-stream を受け取れる ようにします。下記のイベント名(content_block_deltamessage_stopdoneerror)は このアプリ独自の取り決めです。フロントエンドのコンシューマが期待する任意の名前を使って かまいません。

const encoder = new TextEncoder();

function encodeSseEvent(event: string, data: unknown): Uint8Array {
  const payload = typeof data === "string" ? data : JSON.stringify(data);
  return encoder.encode(`event: ${event}\ndata: ${payload}\n\n`);
}

const stream = new ReadableStream<Uint8Array>({
  async start(controller) {
    const send = (event: string, data: unknown) => {
      controller.enqueue(encodeSseEvent(event, data));
    };

    const kimiStream = await env.AI.run(KIMI_MODEL, { messages, stream: true });
    const reader = kimiStream.getReader();
    const dec = new TextDecoder();
    let buf = "";
    let index = 0;

    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      buf += dec.decode(value, { stream: true });

      // Split on SSE frame boundaries (double newline).
      const parts = buf.split("\n\n");
      buf = parts.pop() ?? "";

      for (const part of parts) {
        const line = part.replace(/^data:\s*/m, "").trim();
        if (!line || line === "[DONE]") continue;

        let parsed: unknown;
        try {
          parsed = JSON.parse(line);
        } catch {
          continue; // malformed chunk — skip
        }

        // Workers AI chat-completions shape:
        //   { choices: [{ delta: { content: "..." } }] }
        // Older models use { response: "..." } — accept both.
        let text: string | null = null;
        if (parsed && typeof parsed === "object" && "choices" in parsed) {
          const choices = (parsed as { choices: Array<{ delta?: { content?: string } }> }).choices;
          text = choices?.[0]?.delta?.content ?? null;
        } else if (parsed && typeof parsed === "object" && "response" in parsed) {
          const r = (parsed as { response: unknown }).response;
          text = typeof r === "string" ? r : null;
        }

        if (text !== null && text !== "") {
          send("content_block_delta", { index, text });
          index++;
        }
      }
    }

    send("message_stop", {});
    send("done", {});
    controller.close();
  },
});

return new Response(stream, {
  status: 200,
  headers: {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache, no-transform",
  },
});

📝 no-transform を付ける理由

Cache-Control: no-cache, no-transform を指定すると、Cloudflare がストリームを バッファリングしたり書き換えたりしなくなります。これがないと中間装置がチャンクを溜め込み、 「ストリーミングらしさ」が失われることがあります。

クライアント切断時に中断する

ストリーミングの途中でユーザーがタブを閉じた場合、処理中の AI リクエストは最後まで走らせて 誰も読まないレスポンスにクォータを請求するのではなく、すぐに止めたいはずです。補完的な 2 つのフックがあり、両方を使うべきです。

  1. 受信リクエストの signal はクライアントがいなくなると発火します。これをリッスンします。
  2. ReadableStreamcancel() コールバックは、プラットフォームが送出側レスポンスを 破棄したときに発火します。これが実際に破棄を保証する側です。

両方を 1 つの AbortController に束ね、その signal を読み取りループ内でチェックします。

const aborter = new AbortController();

// (1) Client request signal → abort.
const clientSignal: AbortSignal | undefined = req.signal;
const onClientAbort = () => aborter.abort();
clientSignal?.addEventListener("abort", onClientAbort, { once: true });

const stream = new ReadableStream<Uint8Array>({
  async start(controller) {
    const send = (event: string, data: unknown) => {
      if (aborter.signal.aborted) return;
      controller.enqueue(encodeSseEvent(event, data));
    };

    const reader = kimiStream.getReader();
    while (true) {
      if (aborter.signal.aborted) break; // stop reading from Workers AI
      const { value, done } = await reader.read();
      if (done) break;
      // ...parse and send as above...
    }

    clientSignal?.removeEventListener("abort", onClientAbort);
    controller.close();
  },

  // (2) Response torn down by the platform → abort.
  cancel() {
    aborter.abort();
  },
});

aborter が発火すると、次の if (aborter.signal.aborted) break でループを抜け、reader が 解放されるため、Workers AI からそれ以上フレームが引き出されることはありません。

ランタイムのキルスイッチ

コストの急増、不正利用、あるいはモデルの不調など、エンドポイントを今すぐ止めたい場面が あります。デプロイを出さずにそれを実現するのが、リクエスト時に読み取る単なる環境変数です。 これがサーキットブレーカーになります。

// Kill switch — short-circuit before any heavy work.
// "1" = disabled; any other value (including unset) = enabled.
if (env.AI_CHAT_DISABLED === "1") {
  return Response.json({ error: "ai_unavailable" }, { status: 503 });
}

Cloudflare ダッシュボードの Settings → Variables and Secrets で設定します。

AI_CHAT_DISABLED = "1"

リクエストごとに読み取られるため、ダッシュボードで切り替えると次のリクエストから反映され、 再デプロイは不要です。チェックがちょうど === "1" である点に注意してください。未設定や その他の値ではエンドポイントは有効なままなので、デフォルト(変数なし)は「オン」です。

💡 wrangler.toml に記しておく

値自体はダッシュボードに置かれますが、wrangler.toml にコメントを残しておくと、次に読む人が この変数の存在と "1" の意味を把握できます。

# Kill switch for the chat endpoint.
# Set AI_CHAT_DISABLED = "1" in the Cloudflare dashboard (Settings →
# Variables and Secrets) to return 503 without a redeploy. Unset or any
# other value leaves the endpoint enabled.
# [vars]
# AI_CHAT_DISABLED = "0"

関連