ハイパーマッスルエンジニア

Vim、ShellScriptについてよく書く

Next.jsを使ったServer Sent Events (SSE)の実現方法

Server Sent Events (SSE)は、サーバからクライアントにリアルタイムで情報をプッシュするための技術です。この記事では、そのフロントエンドでの実装方法をいくつか紹介します。

完成品

今回実装したものは下記に収めてます。Next.jsを使っています。

github.com

下記で実際にブラウザで動作確認できます。
https://server-sent-events-example.vercel.app

ChatGPTのようなUIを実装するのに色々な方法を試しました。

SSEのサーバー側のコード

サーバーサイドの設定は以下のとおりです。

  1. SSEをサポートするエンドポイントを作成します。
  2. レスポンスのヘッダーに'Content-Type': 'text/event-stream'を設定し、接続を開いたままにします。
  3. 必要に応じて、特定の間隔でデータを送信します。

pages/api/events.ts

import type { NextApiRequest, NextApiResponse } from 'next'

export default function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.setHeader('Content-Encoding', 'none');

  const text = '夜空に広がる無数の星々の中、ひときわ明るく輝く星がありました。'

  let index = 0;
  const intervalId = setInterval(() => {
    if (index < text.length) {
      res.write(`data: ${text[index]}\n\n`);
      index++;
    } else {
      res.write(`data: [DONE]\n\n`); // ChatGPTっぽく終端文字を挿入
      clearInterval(intervalId);
      res.end();
    }
  }, 100);

  req.on('close', () => {
    clearInterval(intervalId);
    res.end();
  });
}

この状態でhttp://localhost:3000/api/eventsにリクエストを送ると下記のようなデータが流れ込んでくるはずです。

curl "http://localhost:3000/api/events"

# data:
# 
# data: 夜
# 
# data: 空
# 
# data: に
#
# ...
# 
# data: た
# 
# data: 。
# 
# data: [DONE]

サーバー側はこれで終了です。SSEのデータはdata:ではじまり\n\nで終わる形になります。data:の他にもideventが種類としてありますので、詳しくは以下のサイトがわかりやすかったです。

ja.javascript.info

フロント側のコード

フロント側でSSEを受け取る方法はEventSourceを使ったものとfetchを使ったものの2種類があります。 一旦シンプルなEventSourceで実装してみましょう。

EventSourceによる実装

EventSourceでの実装は以下のとおりです。

  1. EventSourceインターフェースを使用してサーバーに接続します。
  2. サーバーからのイベントに対応するためのリスナーを設定します。

pages/index.tsx

import { useEffect, useState } from 'react';

export default function Home() {
  const [message, setMessage] = useState('');

  useEffect(() => {
    const eventSource = new EventSource('/api/events');

    eventSource.onmessage = (event) => {
      setMessage(prev => prev + event.data);
    };

    return () => {
      eventSource.close();
    };
  }, []);

  return (
      <p>{message}</p>
  );
}

この状態でページにアクセスすると以下のようになるはずです。終端までいくと自動で再度接続します。

以上でSSEのサーバー・フロントの実装は完了です。思ったよりシンプルかと思います。
ここからはフロント側の実装に焦点を当てていきます。フロント側はEventSourceの他にfetchでもSSEを受け取ることが可能です。fetchのほうがより柔軟に複雑な要件に対応できるので、業務で使う場合はおそらくfetchでやることが多いのかなと思います。まずはライブラリを利用しないピュアなfetchでの実装方法を見ていきましょう。

fetchによる実装

コードは以下のような形です。(こちらのページが非常に参考になりました、ありがとうございます!)

pages/fetch.tsx

import { useEffect, useState } from 'react'

export default function Home() {
  const [message, setMessage] = useState('')

  const onClick = async () => {
    const response = await fetch('/api/events', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ prompt: 'hoge' }) // GPTのAPIの場合はここでプロンプトを指定
    })
    const reader = response.body?.getReader()
    if (!reader) return

    let decoder = new TextDecoder()
    while (true) {
      const { done, value } = await reader.read()
      if (done) break
      if (!value) continue
      const lines = decoder.decode(value)
      const text = lines.split('data: ')[1].trim()
      setMessage(prev => prev + text)
    }
  }

  return (
    <div>
      <button onClick={onClick}>START</button>
      <p>{message}</p>
    </div>
  )
}

以下のような感じで動作します。fetchなので終端までいったあと、自動で再接続は行われません。

EventSourceではGETリクエストしか使えないので、リクエストに必要な情報(例えばプロンプト文)などをすべてURLにGETパラメータとして含めないといけません。ほとんどのブラウザでURLの最大文字数は2000文字程度なので、画像情報などを渡すとすぐに制限に引っかかってしまいます。
fetchであればGET以外にもPOSTなどが使えるので、リクエストBodyに必要な情報をいれることができます。また、接続が切断されたあとのリトライ制御もfetchのほうがやりやすかったり、カスタムヘッダーを渡せたりします。
このような理由から、ちゃんとしたアプリケーションでSSEを使うとなるとEventSourceでは物足りず、fetchを使った実装になるかと思います。

ただ、上記を考慮すると自前で実装するのもなかなか大変なので、ライブラリを使うのが良さそうです。ここではSSEに対応したライブラリを2つ紹介します。

Azure/fetch-event-source

github.com

Microsoftが提供しているライブラリです。使い方もEventSourceとほぼ変わらず、エラー処理はonerror, onclose, onopen, onmessageなどイベントリスナーのように適切に処理が可能になっています。
コードは以下のような形です。

pages/fetch-event-source.tsx

import { useEffect, useState } from 'react'
import {EventStreamContentType, fetchEventSource} from '@microsoft/fetch-event-source'

class RetriableError extends Error { }
class FatalError extends Error { }

export default function Home() {
  const [message, setMessage] = useState('')

  const onClick = async () => {
    fetchEventSource('/api/events', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ prompt: 'hoge' }), // GPTのAPIの場合はここでプロンプトを指定
      async onopen(response) {
          if (response.ok && response.headers.get('content-type') === EventStreamContentType) {
              return;
          } else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
              throw new FatalError();
          } else {
              throw new RetriableError();
          }
      },
      onmessage(msg) {
          if (msg.event === 'FatalError') {
              throw new FatalError(msg.data);
          }
          setMessage(prev => prev + msg.data)
      },
      onclose() {
          throw new RetriableError();
      },
      onerror(err) {
          if (err instanceof FatalError) {
              throw err; // rethrow to stop the operation
          }
      }
    });
  }

  return (
    <div>
      <button onClick={onClick}>START</button>
      <p>{message}</p>
    </div>
  )
}

少し複雑に見えるかもしれませんが、各イベントに対してエラー処理が可能なので安心感があります。

Vercel AI SDK

続いてVercel社が出しているVercel AI SDKです。

sdk.vercel.ai

こちらはSSEのためのライブラリではなく、「AIを活用したユーザーインターフェースを構築するためのライブラリ」なので若干毛色は違いますが、単純なSSEにも利用できます。コードは以下のような形です。

pages/vercel-ai-sdk.tsx

import { useEffect, useState } from 'react'
import { useChat, useCompletion } from 'ai/react';

export default function Home() {
  const { messages, setInput, handleSubmit } = useChat({api: '/api/events'});
  return (
    <div>
      <form onSubmit={handleSubmit}>
        <button type="submit" onClick={() => setInput('inputに値がないとsubmitが発動しないので適当にいれる')}>START</button>
      </form>
      {messages.map(m => {
        if (m.role === 'user') return
        const text = m.content.split('data: ').map(line => line.trim()).filter(s => s).join('');
        return (
        <div key={m.id}>
          {text}
        </div>
      )})}
    </div>
  );
}

カスタムフックを利用する形になっていて、利用する側のコードはかなりシンプルです。チャットUIを提供するものなのでmessagesの型にrole(userまたはassistant)があってその辺を若干ごにょらないといけなかったりしますが、使い勝手はなかなか良いです。

Vercelにデプロイするときの注意

今回実装したものは、このままVercelにデプロイしてもSSEが機能しません。VercelのAPI RouteはServerless Functionsで実行されるので、タイムアウト制限に引っ掛かりストリームを処理できないからです。
VercelでSSEを動かすためにはAPI Routeの設定でランタイムをedgeにし、ストリームを処理する形に変える必要があります。
まず、フロント側はそのままでOKです。サーバー側を以下のようにruntime:edgeにしReadableStreamを利用した形にします。

pages/api/events-for-nextjs.ts

import type { NextApiRequest, NextApiResponse } from "next";

export const config = {
  runtime: "edge",
};

const handler = async (req: Request): Promise<Response> => {
  const text = '夜空に広がる無数の星々の中、ひときわ明るく輝く星がありました。'
  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    start(controller) {
      let index = 0;
      const intervalId = setInterval(() => {
        if (index < text.length) {
          const chunkText = `data: ${text[index]}\n\n`;
          const queue = encoder.encode(chunkText);
          controller.enqueue(queue);
          index++;
        } else {
          controller.enqueue(encoder.encode(`data: [DONE]\n\n`));
          controller.close();
          clearInterval(intervalId);
        }
      }, 100);
    },
  });

  return new Response(stream, {
    headers: { "Content-Type": "text/event-stream; charset=utf-8" },
  });
};
export default handler;

動作確認は以下で可能です。

https://server-sent-events-example.vercel.app/vercel-ai-sdk

ブラウザで/api/events-for-vercelにアクセスすると、SSEを体感できます。

https://server-sent-events-example.vercel.app/api/events-for-vercel

ズダダダダダと文字列が流れてくる

まとめ

Server Sent Events (SSE)の実装についてまとめました。上記をすべてまとめたリポジトリが下記になります。

github.com

SSEはフロント側は様々な実装方法があるので、要件に適した実装方法を選ぶことができたらよいですね。
なんにせよChatGPTのようなUIは作ってて楽しいです。