zudo-cloudflare-wisdom

Type to search...

to open search from anywhere

Workers AI Streaming SSE Proxy

CreatedMay 28, 2026Takeshi Takatsudo

Re-frame a Workers AI stream into SSE, abort on client disconnect, and gate it with a runtime kill switch

Overview

This recipe builds a Worker endpoint that proxies a Workers AI chat model and streams the response to the browser as Server-Sent Events (SSE). It covers three production concerns that the basic “call env.AI.run” examples skip:

  • Re-framing the Workers AI ReadableStream into a text/event-stream response.
  • Aborting the in-flight AI request when the client disconnects, so a closed tab stops burning Workers AI quota.
  • A runtime kill switch (a plain dashboard env var) that returns 503 without a redeploy.

It is grounded in the POST /api/v1/chat handler from the zudo-text sync server.

Binding the AI model

Workers AI is exposed through an [ai] binding in wrangler.toml:

[ai]
binding = "AI"

That gives you env.AI in the Worker.

⚠️ Named environments do NOT inherit [ai]

The top-level [ai] binding does not propagate into named environments. If you deploy with --env staging, you must repeat the binding under that environment or env.AI will be undefined there:

[ai]
binding = "AI"

[env.staging.ai]
binding = "AI"

Every named environment needs its own [env.<name>.ai] block.

Calling the model with streaming

Pass the model ID in @cf/... form and set stream: true. With streaming on, run() resolves to a ReadableStream rather than a finished JSON object:

const KIMI_MODEL = "@cf/moonshotai/kimi-k2.6";

const kimiStream = await env.AI.run(KIMI_MODEL, {
  messages,
  stream: true,
});

The stream emits SSE-style frames: data: {...}\n\n lines, ending with a terminal data: [DONE]\n\n.

Re-framing the AI stream into SSE

You can’t pass the Workers AI stream straight through — you need to parse its frames and re-emit your own SSE events so the browser gets a clean text/event-stream. The event names below (content_block_delta, message_stop, done, error) are this app’s convention; pick whatever names your frontend consumer expects.

const encoder = new TextEncoder();

function encodeSseEvent(event: string, data: unknown): Uint8Array {
  const payload = typeof data === "string" ? data : JSON.stringify(data);
  return encoder.encode(`event: ${event}\ndata: ${payload}\n\n`);
}

const stream = new ReadableStream<Uint8Array>({
  async start(controller) {
    const send = (event: string, data: unknown) => {
      controller.enqueue(encodeSseEvent(event, data));
    };

    const kimiStream = await env.AI.run(KIMI_MODEL, { messages, stream: true });
    const reader = kimiStream.getReader();
    const dec = new TextDecoder();
    let buf = "";
    let index = 0;

    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      buf += dec.decode(value, { stream: true });

      // Split on SSE frame boundaries (double newline).
      const parts = buf.split("\n\n");
      buf = parts.pop() ?? "";

      for (const part of parts) {
        const line = part.replace(/^data:\s*/m, "").trim();
        if (!line || line === "[DONE]") continue;

        let parsed: unknown;
        try {
          parsed = JSON.parse(line);
        } catch {
          continue; // malformed chunk — skip
        }

        // Workers AI chat-completions shape:
        //   { choices: [{ delta: { content: "..." } }] }
        // Older models use { response: "..." } — accept both.
        let text: string | null = null;
        if (parsed && typeof parsed === "object" && "choices" in parsed) {
          const choices = (parsed as { choices: Array<{ delta?: { content?: string } }> }).choices;
          text = choices?.[0]?.delta?.content ?? null;
        } else if (parsed && typeof parsed === "object" && "response" in parsed) {
          const r = (parsed as { response: unknown }).response;
          text = typeof r === "string" ? r : null;
        }

        if (text !== null && text !== "") {
          send("content_block_delta", { index, text });
          index++;
        }
      }
    }

    send("message_stop", {});
    send("done", {});
    controller.close();
  },
});

return new Response(stream, {
  status: 200,
  headers: {
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache, no-transform",
  },
});

📝 Why no-transform

Cache-Control: no-cache, no-transform keeps Cloudflare from buffering or rewriting the stream — without it, intermediaries may hold chunks back and the “streaming” feel disappears.

Aborting on client disconnect

If the user closes the tab mid-stream, you want the in-flight AI request to stop immediately rather than run to completion and bill quota for a response nobody will read. There are two complementary hooks, and you want both:

  1. The incoming request’s signal fires when the client goes away — listen on it.
  2. The ReadableStream’s cancel() callback fires when the platform tears down the outgoing response — this is the one that actually guarantees teardown.

Wire both into a single AbortController and check its signal inside the read loop:

const aborter = new AbortController();

// (1) Client request signal → abort.
const clientSignal: AbortSignal | undefined = req.signal;
const onClientAbort = () => aborter.abort();
clientSignal?.addEventListener("abort", onClientAbort, { once: true });

const stream = new ReadableStream<Uint8Array>({
  async start(controller) {
    const send = (event: string, data: unknown) => {
      if (aborter.signal.aborted) return;
      controller.enqueue(encodeSseEvent(event, data));
    };

    const reader = kimiStream.getReader();
    while (true) {
      if (aborter.signal.aborted) break; // stop reading from Workers AI
      const { value, done } = await reader.read();
      if (done) break;
      // ...parse and send as above...
    }

    clientSignal?.removeEventListener("abort", onClientAbort);
    controller.close();
  },

  // (2) Response torn down by the platform → abort.
  cancel() {
    aborter.abort();
  },
});

Once aborter fires, the next if (aborter.signal.aborted) break exits the loop and the reader is released, so no further frames are pulled from Workers AI.

Runtime kill switch

Sometimes you need to turn the endpoint off right now — a cost spike, abuse, or a misbehaving model — without shipping a deploy. A plain environment variable read at request time gives you that circuit breaker:

// Kill switch — short-circuit before any heavy work.
// "1" = disabled; any other value (including unset) = enabled.
if (env.AI_CHAT_DISABLED === "1") {
  return Response.json({ error: "ai_unavailable" }, { status: 503 });
}

Set it in the Cloudflare dashboard under Settings → Variables and Secrets:

AI_CHAT_DISABLED = "1"

Because it’s read on every request, flipping it in the dashboard takes effect on the next request with no redeploy. Note the exact check is === "1": unset or any other value leaves the endpoint enabled, so the default (no variable set) is “on”.

💡 Document it in wrangler.toml

Even though the value lives in the dashboard, leave a commented note in wrangler.toml so the next reader knows the variable exists and what "1" means:

# Kill switch for the chat endpoint.
# Set AI_CHAT_DISABLED = "1" in the Cloudflare dashboard (Settings →
# Variables and Secrets) to return 503 without a redeploy. Unset or any
# other value leaves the endpoint enabled.
# [vars]
# AI_CHAT_DISABLED = "0"