跳转至内容
There is a version suitable for your browser's language settings. Would you like to go to the english language of the site?
主页文档

Stream

H1

Milkio 支持以流的形式提供响应。源源不断地返回数据,直到服务器或客户端任意一方选择停止。

最常见的使用场景,是 LLM 的生成,一次性生成完所有的回复需要让用户等待很久,而通过流的方式每生成一些就向用户发送一些数据,可以让用户避免等待带来的不愉快。

除此之外,流也常常用来实现服务器主动向客户端发送消息的功能。拿聊天功能举例,我们可以在页面创建时就调用 API,此 API 用来向用户推送他所收到的消息。一旦有另一个人向他发送消息,该消息就会通过该流主动推送给客户端,这使我们无需引入 WebSocket,就能实现这样的功能。

使用

将 API 的 action 由普通的异步函数改为异步生成器函数,我们就可以使 API 以流的形式提供响应。异步生成器函数和普通的异步函数的区别,是在名称前添加了 *,并且使用 yield 来替代 return

当我们想向客户端响应一次数据时,只需要使用 yield 关键字即可,并且我们可以多次使用它。下面的示例尝试每秒向客户端推送一次当前的时间。

/src/app/your-stream-api
export const api = defineApi({
meta: {},
async *action(params: { name: string }, context) {
while (true) {
yield `你好,${params.name}!现在的时间是:`;
yield new Date().toLocaleString();
await Bun.sleep(1000);
}
},
});

Milkio Client

Milkio Client 可以通过 executeStream 方法来调用,结果中包含 stream 对象,它是一个 AsyncGenerator,你可以通过循环来处理它。

const { stream } = client.executeStream("your-stream-api", { params: { name: 'furina' } });
for await (const chunk of stream) {
console.log(chunk);
}

自动重试

当我们订阅消息时,我们可能希望流一直不会关闭。但由于网络的不稳定性,流总有可能被关闭。我们可以在流停止时自动重连。

有些 Serverless 平台或者 CDN 或者网关也会限制我们 API 的最大运行时间,当这个值达到后我们的连接也将被迫中断。

我们可以简单地在外层添加一个循环来不断地重试。

while (true) {
const { getError, stream } = client.executeStream("your-stream-api", { name: 'furina' });
for await (const chunk of stream) {
console.log(chunk);
}
// 在重试前,添加 3 秒延迟
await new Promise((resolve) => setTimeout(resolve, 3000));
}

与 EventSource 不同,你可以自由控制重试的细节,这意味着,你可以实现当用户长时间无操作,或将网页置于后台时,自动停止流,以节省服务器资源。

结果

你依然可以获取到像普通 API 一样的结果。我们可以通过结果来判断,流是否是在运行期间遭遇了错误而导致的结束。

const { getResult, stream } = client.executeStream("foo", { params: { name: 'furina' } });
for await (const chunk of stream) {
console.log(chunk);
}
const result = getResult();
if (result.success) console.log("流是正常结束的");
else console.log(`流是因遭遇错误结束的`, result.fail);

请在流结束之后,再调用 getResult 方法来获取结果。否则,你可能会先得到成功的结果,再得到失败的结果。因为在失败之前,实际上流是成功的。

停止

你可以随时通过调用 return 方法来停止流。停止流并非是忽略后续的数据并跳出循环,而是真正的停止,会同时停止服务器中代码的继续执行。

await stream.return(undefined);

除此以外,使用 break 语句直接跳出循环,也会自动停止流。

Server-Sent Events

Milkio 的流在响应格式上兼容 Server-Sent Events。

但是你应该使用 Milkio 的客户端,而非浏览器中原生的 EventSource 对象。你会获得以下好处:

  • 可以向 Milkio 传递参数和 header,EventSource 只能发送 GET 请求,这意味着你无法携带 body,也无法更改 header。

  • 自由地控制重试逻辑和间隔,EventSource 永远都会重试,且重试间隔是浏览器内置的(3000 毫秒)。

  • 可以在浏览器之外使用,例如另一个 TypeScript 服务器。

如果你被迫使用 Milkio Client 之外的方式与 Milkio 应用通信,那么你可以采用任意支持 Server-Sent Events 规范的库,来与 Milkio 的 Stream API 通信。但 Milkio 和完整的 Server-Sent Events 规范,将会有一些细微的差异:

  • 没有 Event ID。

  • 服务器无法控制客户端重试间隔。

  • 请求头必须固定为 Accept: text/event-stream

  • 如果你需要携带参数,请以 POST 方法发送请求,并将内容在请求体中,以 JSON(或 TSON)格式发送。