Server Sent Events (SSE)は、サーバからクライアントにリアルタイムで情報をプッシュするための技術です。この記事では、そのフロントエンドでの実装方法をいくつか紹介します。
完成品
今回実装したものは下記に収めてます。Next.jsを使っています。
下記で実際にブラウザで動作確認できます。
https://server-sent-events-example.vercel.app
ChatGPTのようなUIを実装するのに色々な方法を試しました。
SSEのサーバー側のコード
サーバーサイドの設定は以下のとおりです。
- SSEをサポートするエンドポイントを作成します。
- レスポンスのヘッダーに'Content-Type': 'text/event-stream'を設定し、接続を開いたままにします。
- 必要に応じて、特定の間隔でデータを送信します。
pages/api/events.ts
import type { NextApiRequest, NextApiResponse } from 'next' export default function handler( req: NextApiRequest, res: NextApiResponse ) { res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Encoding', 'none'); const text = '夜空に広がる無数の星々の中、ひときわ明るく輝く星がありました。' let index = 0; const intervalId = setInterval(() => { if (index < text.length) { res.write(`data: ${text[index]}\n\n`); index++; } else { res.write(`data: [DONE]\n\n`); // ChatGPTっぽく終端文字を挿入 clearInterval(intervalId); res.end(); } }, 100); req.on('close', () => { clearInterval(intervalId); res.end(); }); }
この状態でhttp://localhost:3000/api/events
にリクエストを送ると下記のようなデータが流れ込んでくるはずです。
curl "http://localhost:3000/api/events" # data: # # data: 夜 # # data: 空 # # data: に # # ... # # data: た # # data: 。 # # data: [DONE]
サーバー側はこれで終了です。SSEのデータはdata:
ではじまり\n\n
で終わる形になります。data:
の他にもid
やevent
が種類としてありますので、詳しくは以下のサイトがわかりやすかったです。
フロント側のコード
フロント側でSSEを受け取る方法はEventSource
を使ったものとfetch
を使ったものの2種類があります。
一旦シンプルなEventSourceで実装してみましょう。
EventSourceによる実装
EventSourceでの実装は以下のとおりです。
- EventSourceインターフェースを使用してサーバーに接続します。
- サーバーからのイベントに対応するためのリスナーを設定します。
pages/index.tsx
import { useEffect, useState } from 'react'; export default function Home() { const [message, setMessage] = useState(''); useEffect(() => { const eventSource = new EventSource('/api/events'); eventSource.onmessage = (event) => { setMessage(prev => prev + event.data); }; return () => { eventSource.close(); }; }, []); return ( <p>{message}</p> ); }
この状態でページにアクセスすると以下のようになるはずです。終端までいくと自動で再度接続します。
以上でSSEのサーバー・フロントの実装は完了です。思ったよりシンプルかと思います。
ここからはフロント側の実装に焦点を当てていきます。フロント側はEventSourceの他にfetchでもSSEを受け取ることが可能です。fetchのほうがより柔軟に複雑な要件に対応できるので、業務で使う場合はおそらくfetchでやることが多いのかなと思います。まずはライブラリを利用しないピュアなfetchでの実装方法を見ていきましょう。
fetchによる実装
コードは以下のような形です。(こちらのページが非常に参考になりました、ありがとうございます!)
pages/fetch.tsx
import { useEffect, useState } from 'react' export default function Home() { const [message, setMessage] = useState('') const onClick = async () => { const response = await fetch('/api/events', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ prompt: 'hoge' }) // GPTのAPIの場合はここでプロンプトを指定 }) const reader = response.body?.getReader() if (!reader) return let decoder = new TextDecoder() while (true) { const { done, value } = await reader.read() if (done) break if (!value) continue const lines = decoder.decode(value) const text = lines.split('data: ')[1].trim() setMessage(prev => prev + text) } } return ( <div> <button onClick={onClick}>START</button> <p>{message}</p> </div> ) }
以下のような感じで動作します。fetchなので終端までいったあと、自動で再接続は行われません。
EventSourceではGETリクエストしか使えないので、リクエストに必要な情報(例えばプロンプト文)などをすべてURLにGETパラメータとして含めないといけません。ほとんどのブラウザでURLの最大文字数は2000文字程度なので、画像情報などを渡すとすぐに制限に引っかかってしまいます。
fetchであればGET以外にもPOSTなどが使えるので、リクエストBodyに必要な情報をいれることができます。また、接続が切断されたあとのリトライ制御もfetchのほうがやりやすかったり、カスタムヘッダーを渡せたりします。
このような理由から、ちゃんとしたアプリケーションでSSEを使うとなるとEventSourceでは物足りず、fetchを使った実装になるかと思います。
ただ、上記を考慮すると自前で実装するのもなかなか大変なので、ライブラリを使うのが良さそうです。ここではSSEに対応したライブラリを2つ紹介します。
Azure/fetch-event-source
Microsoftが提供しているライブラリです。使い方もEventSourceとほぼ変わらず、エラー処理はonerror
, onclose
, onopen
, onmessage
などイベントリスナーのように適切に処理が可能になっています。
コードは以下のような形です。
pages/fetch-event-source.tsx
import { useEffect, useState } from 'react' import {EventStreamContentType, fetchEventSource} from '@microsoft/fetch-event-source' class RetriableError extends Error { } class FatalError extends Error { } export default function Home() { const [message, setMessage] = useState('') const onClick = async () => { fetchEventSource('/api/events', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ prompt: 'hoge' }), // GPTのAPIの場合はここでプロンプトを指定 async onopen(response) { if (response.ok && response.headers.get('content-type') === EventStreamContentType) { return; } else if (response.status >= 400 && response.status < 500 && response.status !== 429) { throw new FatalError(); } else { throw new RetriableError(); } }, onmessage(msg) { if (msg.event === 'FatalError') { throw new FatalError(msg.data); } setMessage(prev => prev + msg.data) }, onclose() { throw new RetriableError(); }, onerror(err) { if (err instanceof FatalError) { throw err; // rethrow to stop the operation } } }); } return ( <div> <button onClick={onClick}>START</button> <p>{message}</p> </div> ) }
少し複雑に見えるかもしれませんが、各イベントに対してエラー処理が可能なので安心感があります。
Vercel AI SDK
続いてVercel社が出しているVercel AI SDKです。
こちらはSSEのためのライブラリではなく、「AIを活用したユーザーインターフェースを構築するためのライブラリ」なので若干毛色は違いますが、単純なSSEにも利用できます。コードは以下のような形です。
pages/vercel-ai-sdk.tsx
import { useEffect, useState } from 'react' import { useChat, useCompletion } from 'ai/react'; export default function Home() { const { messages, setInput, handleSubmit } = useChat({api: '/api/events'}); return ( <div> <form onSubmit={handleSubmit}> <button type="submit" onClick={() => setInput('inputに値がないとsubmitが発動しないので適当にいれる')}>START</button> </form> {messages.map(m => { if (m.role === 'user') return const text = m.content.split('data: ').map(line => line.trim()).filter(s => s).join(''); return ( <div key={m.id}> {text} </div> )})} </div> ); }
カスタムフックを利用する形になっていて、利用する側のコードはかなりシンプルです。チャットUIを提供するものなのでmessagesの型にrole(userまたはassistant)
があってその辺を若干ごにょらないといけなかったりしますが、使い勝手はなかなか良いです。
Vercelにデプロイするときの注意
今回実装したものは、このままVercelにデプロイしてもSSEが機能しません。VercelのAPI RouteはServerless Functionsで実行されるので、タイムアウト制限に引っ掛かりストリームを処理できないからです。
VercelでSSEを動かすためにはAPI Routeの設定でランタイムをedgeにし、ストリームを処理する形に変える必要があります。
まず、フロント側はそのままでOKです。サーバー側を以下のようにruntime:edge
にしReadableStream
を利用した形にします。
pages/api/events-for-nextjs.ts
import type { NextApiRequest, NextApiResponse } from "next"; export const config = { runtime: "edge", }; const handler = async (req: Request): Promise<Response> => { const text = '夜空に広がる無数の星々の中、ひときわ明るく輝く星がありました。' const encoder = new TextEncoder(); const stream = new ReadableStream({ start(controller) { let index = 0; const intervalId = setInterval(() => { if (index < text.length) { const chunkText = `data: ${text[index]}\n\n`; const queue = encoder.encode(chunkText); controller.enqueue(queue); index++; } else { controller.enqueue(encoder.encode(`data: [DONE]\n\n`)); controller.close(); clearInterval(intervalId); } }, 100); }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream; charset=utf-8" }, }); }; export default handler;
動作確認は以下で可能です。
https://server-sent-events-example.vercel.app/vercel-ai-sdk
ブラウザで/api/events-for-vercel
にアクセスすると、SSEを体感できます。
https://server-sent-events-example.vercel.app/api/events-for-vercel
まとめ
Server Sent Events (SSE)の実装についてまとめました。上記をすべてまとめたリポジトリが下記になります。
SSEはフロント側は様々な実装方法があるので、要件に適した実装方法を選ぶことができたらよいですね。
なんにせよChatGPTのようなUIは作ってて楽しいです。