SSE 事件流

SSE 事件流中,一个数据包就是一个 Token,其 content-type 应指定为 text/eventstream

  1. data: {"id": "chatcmpl-6w****KZb6hx****RzIghUz****Qy", "object": "chat.completion.chunk", "created": 16******79, "model": "gpt-3.5-turbo-0301", "choices": [{"delta": {"role": "assistant"}, "index": 0, "finish_reason": null}]}
  2. data: {"id": "chatcmpl-6w****KZb6hx****RzIghUz****Qy", "object": "chat.completion.chunk", "created": 16******79, "model": "gpt-3.5-turbo-0301", "choices": [{"delta": {"content": "苹"}, "index": 0, "finish_reason": null}]}
  3. data: {"id": "chatcmpl-6w****KZb6hx****RzIghUz****Qy", "object": "chat.completion.chunk", "created": 16******79, "model": "gpt-3.5-turbo-0301", "choices": [{"delta": {"content": "果"}, "index": 0, "finish_reason": null}]}
  4. data: {"id": "chatcmpl-6w****KZb6hx****RzIghUz****Qy", "object": "chat.completion.chunk", "created": 16******79, "model": "gpt-3.5-turbo-0301", "choices": [{"delta": {"content": "公司"}, "index": 0, "finish_reason": null}]}
  5. data: {"id": "chatcmpl-6w****KZb6hx****RzIghUz****Qy", "object": "chat.completion.chunk", "created": 16******79, "model": "gpt-3.5-turbo-0301", "choices": [{"delta": {"content": "是"}, "index": 0, "finish_reason": null}]}
  6. data: {"id": "chatcmpl-6w****KZb6hx****RzIghUz****Qy", "object": "chat.completion.chunk", "created": 16******79, "model": "gpt-3.5-turbo-0301", "choices": [{"delta": {"content": "一"}, "index": 0, "finish_reason": null}]}
  7. data: {"id": "chatcmpl-6w****KZb6hx****RzIghUz****Qy", "object": "chat.completion.chunk", "created": 16******79, "model": "gpt-3.5-turbo-0301", "choices": [{"delta": {"content": "家"}, "index": 0, "finish_reason": null}]}
  8. data: {"id": "chatcmpl-6w****KZb6hx****RzIghUz****Qy", "object": "chat.completion.chunk", "created": 16******79, "model": "gpt-3.5-turbo-0301", "choices": [{"delta": {"content": "科"}, "index": 0, "finish_reason": null}]}
  9. ...

前端处理

一种方式是后台处理上述 SSE 事件流返回单纯的文本流,另一种方式是前端处理 SSE 事件流,需要注意 ReadableStream 传输的是字节流,处理完成后的数据需要用 encoder 去编码。

  1. const response = await fetch('your fetch url', {
  2. method: 'POST',
  3. headers: {
  4. 'Content-Type': 'application/json;charset=UTF-8',
  5. ...
  6. },
  7. body: JSON.stringify({
  8. messages,
  9. stream: true,
  10. }),
  11. });
  12. // 确保服务器响应是成功的
  13. if (!response.ok) {
  14. throw new Error(`HTTP error! status: ${response.status}`);
  15. }
  16. // 获取 reader
  17. const reader = response.body.getReader();
  18. const decoder = new TextDecoder('utf-8');
  19. // 注意
  20. const encoder = new TextEncoder();
  21. // 构造新的 ReadableStream.
  22. const readableStream = new ReadableStream({
  23. async start(controller) {
  24. while (true) {
  25. const { done, value } = await reader.read();
  26. if (done) {
  27. break;
  28. }
  29. const message = decoder.decode(value, { stream: true });
  30. const lines = message
  31. .toString()
  32. .split('\n\n')
  33. .filter((line) => line.trim() !== '');
  34. for (const line of lines) {
  35. const message = line.replace('data: ', '');
  36. const parsed = JSON.parse(message);
  37. controller.enqueue(encoder.encode(parsed.choices[0].delta.content));
  38. }
  39. }
  40. controller.close();
  41. },
  42. });
  43. return new Response(readableStream);

从 Fetch 到 Streams —— 以流的角度处理网络请求 - 掘金