本文由45岁老架构师尼恩分享,感谢原作者,即时通讯网有修订和重新排版。
1.png (30.34 KB, 下载次数: 10)
下载附件 保存到相册
6 天前 上传
2.png (24.06 KB, 下载次数: 8)
3.png (41.45 KB, 下载次数: 6)
4.png (20.72 KB, 下载次数: 7)
5.png (26.09 KB, 下载次数: 8)
6.png (78.81 KB, 下载次数: 7)
7.png (26.66 KB, 下载次数: 9)
8.png (33.14 KB, 下载次数: 5)
9.png (38.24 KB, 下载次数: 9)
AI 应用需要“打字机”式的逐 token 输出体验,而 SSE 作为一种基于 HTTP 的、简单的、单向的服务器推送技术,是实现这种体验最自然、最高效、最可靠的技术选择。
10.png (37.17 KB, 下载次数: 6)
Accept: text/event-stream
Content-Type: text/event-stream Cache-Control: no-cache Connection: keep-alive
event: message data: {"time": "2023-10-05T12:00:00", "value": "New update!"} id: 12345 retry: 5000 \n\n
11.png (31.86 KB, 下载次数: 7)
12.png (5.8 KB, 下载次数: 5)
// 检查浏览器是否支持SSE if ('EventSource' in window) { // 支持SSE,可正常使用 console.log('浏览器支持SSE'); } else { // 不支持SSE,需降级处理 console.log('浏览器不支持SSE'); }
// 建立与服务器的SSE连接 // url为服务器提供的SSE接口地址(可同域或跨域) var source = new EventSource(url);
// 跨域请求时,允许携带Cookie var source = new EventSource(url, { withCredentials: true // 默认为false,设为true表示跨域请求携带Cookie });
13.png (17.97 KB, 下载次数: 8)
if (source.readyState === EventSource.OPEN) { console.log('SSE连接已正常建立'); }
14.png (23.66 KB, 下载次数: 7)
// 方式1:使用onopen属性 source.onopen = function (event) { console.log('SSE连接已建立'); // 可在此处做连接成功后的初始化操作,如更新UI状态 }; // 方式2:使用addEventListener(推荐,可添加多个回调) source.addEventListener('open', function (event) { console.log('SSE连接已建立(监听方式)'); }, false);
// 方式1:使用onmessage属性 source.onmessage = function (event) { // event.data为服务器推送的文本数据 var data = event.data; console.log('收到数据:', data); // 可在此处处理数据,如更新页面内容 }; // 方式2:使用addEventListener source.addEventListener('message', function (event) { var data = event.data; console.log('收到数据(监听方式):', data); }, false);
// 方式1:使用onerror属性 source.onerror = function (event) { // 可根据readyState判断错误类型 if (source.readyState === EventSource.CONNECTING) { console.log('连接出错,正在尝试重连...'); } else { console.log('连接已关闭,无法重连'); } }; // 方式2:使用addEventListener source.addEventListener('error', function (event) { // 错误处理逻辑 }, false);
// 主动关闭SSE连接 source.close(); console.log('SSE连接已手动关闭');
// 监听名为"order"的自定义事件 source.addEventListener('order', function (event) { var orderData = event.data; console.log('收到新订单:', orderData); // 处理订单相关逻辑 }, false); // 再监听一个名为"notice"的自定义事件 source.addEventListener('notice', function (event) { var noticeData = event.data; console.log('收到系统公告:', noticeData); // 处理公告相关逻辑 }, false);
Content-Type: text/event-stream // 必须,指定为事件流类型 Cache-Control: no-cache // 必须,禁止缓存,确保数据实时性 Connection: keep-alive // 必须,保持长连接
: 这是一条注释(客户端会忽略)\n data: 这是第一条消息\n\n data: 这是第二条消息的第一行\n data: 这是第二条消息的第二行\n\n
data: Hello, SSE!\n\n // 单行数据,以\n\n结束 多行数据(适合JSON等复杂结构): ``` data: {\n // 第一行以\n结束 data: "name": "张三",\n // 第二行以\n结束 data: "age": 20\n // 第三行以\n结束 data: }\n\n // 最后一行以\n\n结束
event: order\n // 指定事件类型为order data: 新订单ID:12345\n // 消息内容 \n // 消息结束(\n\n简化为单独一行)
source.addEventListener('order', function(event) { console.log(event.data); // 输出:新订单ID:12345 });
id: msg1001\n // 消息标识 data: 这是第1001条消息\n \n
Last-Event-ID: msg1001 // 自动携带最后收到的id
retry: 5000\n // 告诉客户端,断线后5秒再重连 data: 重连间隔已设置为5秒\n \n
: 这是保持连接活动的注释行\n : 服务器时间 2023-10-05T12:00:00\n
15.png (27.12 KB, 下载次数: 8)
: 服务器开始发送消息(注释)\n id: 1001\n event: notice\n data: 系统将在10分钟后维护\n\n id: 1002\n event: order\n data: {"orderId": "20230501", "status": "paid"}\n\n retry: 10000\n id: 1003\n data: 重连间隔已调整为10秒\n\n : 这是保持连接活动的注释行\n : 服务器时间 2023-10-05T12:00:00\n
16.png (30.56 KB, 下载次数: 9)
<dependencies> <!-- Spring Web:提供SSE相关类和HTTP服务 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Thymeleaf:用于渲染前端页面 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> </dependencies>
package com.example.sse.controller; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @RestController public class SseController { // 存储所有活跃的SSE连接(线程安全的列表) // CopyOnWriteArrayList适合读多写少场景,避免并发问题 private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>(); // 线程池:用于异步发送事件,避免阻塞主线程 private final ExecutorService executor = Executors.newCachedThreadPool(); / * 客户端订阅SSE的接口 * 客户端通过访问该接口建立长连接,接收服务器推送的事件 */ @GetMapping(value = "/sse/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter subscribe() { // 创建SseEmitter实例,设置超时时间为无限(默认30秒会超时,这里设为Long.MAX_VALUE避免自动断开) SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // 将新连接加入活跃列表(后续推送消息时会遍历这个列表) emitters.add(emitter); // 设置连接完成/超时的回调:从活跃列表中移除该连接,释放资源 emitter.onCompletion(() -> emitters.remove(emitter)); // 连接正常关闭时 emitter.onTimeout(() -> emitters.remove(emitter)); // 连接超时关闭时 // 发送初始连接成功消息(给客户端的"欢迎消息") try { emitter.send(SseEmitter.event() .name("CONNECTED") // 事件名称:客户端可通过"CONNECTED"事件监听 .data("You are successfully connected to SSE server!") // 消息内容 .reconnectTime(5000)); // 告诉客户端:如果断开连接,5秒后重连 } catch (IOException e) { // 发送失败时,标记连接异常结束 emitter.completeWithError(e); } return emitter; // 将emitter返回给客户端,保持连接 } / * 广播消息接口:向所有已连接的客户端推送消息 * 可通过浏览器访问 http://localhost:8080/sse/broadcast?message=xxx 触发 / @GetMapping("/sse/broadcast") public String broadcastMessage(@RequestParam String message) { // 用线程池异步执行广播,避免阻塞当前请求 executor.execute(() -> { // 遍历所有活跃连接,逐个发送消息 for (SseEmitter emitter : emitters) { try { emitter.send(SseEmitter.event() .name("BROADCAST") // 事件名称:客户端监听"BROADCAST"事件 .data(message) // 广播的消息内容 .id(String.valueOf(System.currentTimeMillis()))); // 消息ID(用于重连时定位) } catch (IOException e) { // 发送失败(可能客户端已断开),从列表中移除并标记连接结束 emitters.remove(emitter); emitter.completeWithError(e); } } }); return "Broadcast message: " + message; // 给调用者的响应 } / * 模拟长时间任务:向客户端推送实时进度 * 适合文件上传、数据处理等需要实时反馈进度的场景 / @GetMapping("/sse/start-task") public String startTask() { // 异步执行任务,避免阻塞当前请求 executor.execute(() -> { try { // 模拟任务进度:从0%到100%,每次增加10% for (int i = 0; i <= 100; i += 10) { Thread.sleep(1000); // 休眠1秒,模拟处理耗时 // 向所有客户端推送当前进度 for (SseEmitter emitter : emitters) { try { emitter.send(SseEmitter.event() .name("PROGRESS") // 事件名称:客户端监听"PROGRESS"事件 .data(i + "% completed") // 进度数据 .id("task-progress")); // 固定ID,标识这是任务进度消息 } catch (IOException e) { // 发送失败,移除连接 emitters.remove(emitter); } } // 任务完成时,发送结束消息 if (i == 100) { for (SseEmitter emitter : emitters) { try { emitter.send(SseEmitter.event() .name("COMPLETE") // 事件名称:客户端监听"COMPLETE"事件 .data("Task completed successfully!")); } catch (IOException e) { emitters.remove(emitter); } } } } } catch (InterruptedException e) { // 任务被中断时,恢复线程中断状态并退出 Thread.currentThread().interrupt(); break; } }); return "Task started!"; // 告诉调用者任务已启动 } }
package com.example.sse.controller; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; @Controller // 注意这里用@Controller而非@RestController,用于返回页面 public class PageController { /* * 访问根路径时,返回SSE客户端页面 / @GetMapping("/") public String index() { // 返回src/main/resources/templates目录下的sse-client.html return "sse-client"; } }
Server-Sent Events (SSE) Client Connect to SSE Disconnect Send Broadcast Start Task Messages:
17.png (18.32 KB, 下载次数: 9)
18.png (41.99 KB, 下载次数: 8)
19.png (27.88 KB, 下载次数: 6)
20.png (19.71 KB, 下载次数: 7)
21.png (33.85 KB, 下载次数: 5)
class NetworkProbe { // 关键指标 static RTT_THRESHOLD = 300 // RTT超过300ms视为弱网 static PACKET_LOSS_THRESHOLD = 0.2 // 丢包率>20%触发降级 // 网络状态检测 async check() { const { rtt, packetLoss } = await this._measure() return { isWeak: rtt > NetworkProbe.RTT_THRESHOLD || packetLoss > NetworkProbe.PACKET_LOSS_THRESHOLD } } // 实际测量方法 _measure() { return new Promise(resolve => { const start = Date.now() fetch('/ping', { cache: 'no-store' }) .then(() => { const rtt = Date.now() - start resolve({ rtt, packetLoss: 0 }) }) .catch(() => resolve({ rtt: Infinity, packetLoss: 1 })) }) } }
class HybridConnection { constructor() { this.currentProtocol = null this.ws = null this.sse = null this.messageQueue = [] // 消息缓冲队列 } // 智能连接初始化 async connect() { const { isWeak } = await new NetworkProbe().check() this.currentProtocol = isWeak ? 'sse' : 'ws' if (this.currentProtocol === 'ws') { this._initWebSocket() } else { this._initSSE() } } // WebSocket初始化 _initWebSocket() { this.ws = new WebSocket('wss://api.example.com') this.ws.onmessage = this._handleMessage // 发送缓冲队列消息 this.messageQueue.forEach(msg => this.ws.send(msg)) this.messageQueue = [] } // SSE初始化 _initSSE() { this.sse = new EventSource('https://api.example.com/sse') this.sse.onmessage = this._handleMessage } // 统一消息处理 _handleMessage = (event) => { const data = event.data || event // 业务逻辑处理... } // 发送消息(自动选择协议) send(data) { if (this.currentProtocol === 'ws' && this.ws?.readyState === 1) { this.ws.send(JSON.stringify(data)) } else if (this.currentProtocol === 'sse') { // SSE需通过独立HTTP请求发送 fetch('/send', { method: 'POST', body: JSON.stringify(data) }) } else { // 协议切换中暂存消息 this.messageQueue.push(JSON.stringify(data)) } } // 协议切换(核心!) async switchProtocol() { const { isWeak } = await new NetworkProbe().check() // 无需切换 if (isWeak && this.currentProtocol === 'sse') return if (!isWeak && this.currentProtocol === 'ws') return // 执行切换 if (isWeak) { this.ws?.close() this._initSSE() this.currentProtocol = 'sse' } else { this.sse?.close() this._initWebSocket() this.currentProtocol = 'ws' } } }
22.png (43.15 KB, 下载次数: 9)
23.png (26.63 KB, 下载次数: 7)
来源:即时通讯网 - 即时通讯开发者社区!
轻量级开源移动端即时通讯框架。
快速入门 / 性能 / 指南 / 提问
轻量级Web端即时通讯框架。
详细介绍 / 精编源码 / 手册教程
移动端实时音视频框架。
详细介绍 / 性能测试 / 安装体验
基于MobileIMSDK的移动IM系统。
详细介绍 / 产品截图 / 安装体验
一套产品级Web端IM系统。
详细介绍 / 产品截图 / 演示视频
一套纯血鸿蒙NEXT产品级IM系统。
详细介绍 / 产品截图 / 安装
精华主题数超过100个。
积极发起、参与各类话题的讨论等,主题、发帖内容较有价值。
连续任职达1年以上的合格正式版主
为论区做出突出贡献的开发者、版主等。
Copyright © 2014-2025 即时通讯网 - 即时通讯开发者社区 / 版本 V4.4
苏州网际时代信息科技有限公司 (苏ICP备16005070号-1)
Processed in 0.156250 second(s), 41 queries , Gzip On.