引言

服务器推送事件(Server-Sent Events, SSE)是一种基于 HTTP 的单向数据流技术,允许服务器通过标准 HTTP 连接向客户端推送实时更新。SSE 使用 Content-Type: text/event-stream 头部标识响应内容为事件流,特别适合需要服务器向客户端推送更新但不要求双向通信的场景,例如实时通知、股票价格更新或日志流。

背景与技术概述

SSE 是 HTML5 规范的一部分,通过 EventSource API 提供客户端支持。它的主要特点包括:

  • 单向通信:数据仅从服务器流向客户端,无法通过同一连接反向发送。

  • 自动重连:客户端在连接断开后会自动尝试重连。

  • 基于 HTTP:利用现有 HTTP 基础设施,无需额外协议支持。

  • 事件格式:事件以文本形式发送,每条事件以 data: 开头,结束于两个换行符 \n\n。
    在 Go 中,SSE 的实现通常依赖标准库 net/http,也可以结合框架(如 Gin)或第三方库(如 github.com/r3labs/sse)来简化开发。

实现技术细节

1. HTTP 头部配置

服务端必须在响应中设置以下头部:

  • Content-Type: text/event-stream:标识响应为事件流。

  • Cache-Control: no-cache:防止浏览器缓存响应,确保实时性。

  • Connection: keep-alive:保持连接开放,支持持续流式传输。

2. 事件格式与发送

SSE 事件必须遵循特定格式,每条事件包括以下字段:

  • data::事件数据,多个 data: 行会被拼接为一条消息。

  • 事件以两个换行符 \n\n 结束,表示一条事件的结束。
    例如,发送一条消息 "Hello, World!" 的格式为:

    data: Hello, World!

    在 Go 中,事件发送通常通过 http.ResponseWriter 实现。例如,Pascal Allen 的 Medium 文章中使用了 Gin 框架的 c.SSEvent("message", msg) 方法,而 Kelche.co 的示例直接使用 fmt.Fprintf(w, "data: %d \n\n", rand.Intn(100)) 发送随机数。

3. 保持连接与刷新

为了实现流式输出,服务端需要保持 HTTP 连接开放,通常通过无限循环实现。在每个循环中:

  • 生成或获取事件数据。

  • 写入响应,使用 w.(http.Flusher).Flush() 立即刷新,确保数据实时发送。
    例如,Kelche.co 的 randomHandler 函数每 2 秒发送一次随机数:

    for {
        rand.Seed(time.Now().UnixNano())
        fmt.Fprintf(w, "data: %d \n\n", rand.Intn(100))
        w.(http.Flusher).Flush()
        time.Sleep(2 * time.Second)
    }

4. 处理连接关闭

客户端可能随时断开连接,服务端需检测并安全退出。例如,可以通过检查 http.ResponseWriter 的状态或使用 Hijack 方法检测连接状态。在实际应用中,推荐使用通道(channel)或上下文(context)管理连接生命周期。

4.1 使用上下文管理连接生命周期

  • 上下文的作用:上下文可以用来传递取消信号和截止时间。例如,当客户端断开连接时,HTTP 请求的上下文会被取消,服务器可以通过 <-ctx.Done() 检测到。

  • 关键方法:

    • context.Background():创建一个空的根上下文,通常作为父上下文。

    • context.WithCancel(parentCtx):创建一个可手动取消的上下文,cancel() 函数用于取消。

    • context.WithTimeout(parentCtx, duration):创建一个在指定时间后自动取消的上下文,适合设置 SSE 连接的超时。

    • context.WithDeadline(parentCtx, deadline):创建一个在指定截止时间后自动取消的上下文。

  • 在 SSE 中的应用:

    • 在 SSE 处理函数中,使用 ctx := r.Context() 获取 HTTP 请求的上下文。

    • 使用 select 语句监听 <-ctx.Done(),当上下文被取消时(例如客户端断开),执行清理逻辑。

    • 示例代码:go

      		func sseHandler(w http.ResponseWriter, r *http.Request) {
      		    ctx := r.Context()
      		    for {
      		        select {
      		        case <-ctx.Done():
      		            return // 客户端断开,退出
      		        default:
      		            // 发送数据
      		            fmt.Fprintf(w, "data: message\n\n")
      		            w.(http.Flusher).Flush()
      		            time.Sleep(2 * time.Second)
      		        }
      		    }
      		}

    • 这种方式确保当客户端断开时,goroutine 可以及时退出,避免资源泄漏。

4.2 使用通道管理客户端连接

  • 通道的作用:通道可以用来管理多个客户端的连接生命周期,例如添加新客户端、移除断开的客户端和广播消息。

  • 关键结构:

    • 定义一个 SSEServer 结构体,包含:- clients:一个映射(如 map[*SSEClient]struct{}),存储所有活跃客户端。

      • addClient:一个通道(如 chan *SSEClient),用于添加新客户端。

      • removeClient:一个通道(如 chan *SSEClient),用于移除断开的客户端。

    • 每个 SSEClient 包含一个消息通道(如 chan []byte),用于发送数据。

  • 在 SSE 中的应用:

    • 当新客户端连接时,创建一个 SSEClient,初始化其消息通道,并通过 addClient 通道通知服务器。

    • 当客户端断开时,通过 removeClient 通道通知服务器,服务器从 clients 中移除该客户端并关闭其通道。

    • 使用 sync.Mutex 保护 clients 映射的并发访问,确保线程安全。

  • 示例代码:

    type SSEClient struct {
        ID     string
        Stream chan []byte
    }
    
    type SSEServer struct {
        clients      map[*SSEClient]struct{}
        addClient    chan *SSEClient
        removeClient chan *SSEClient
        mutex        sync.Mutex
    }
    
    func (s *SSEServer) Run() {
        for {
            select {
            case client := <-s.addClient:
                s.mutex.Lock()
                s.clients[client] = struct{}{}
                s.mutex.Unlock()
            case client := <-s.removeClient:
                s.mutex.Lock()
                delete(s.clients, client)
                s.mutex.Unlock()
                close(client.Stream)
            }
        }
    }

5. 客户端交互

客户端通过 EventSource API 连接到 SSE 端点。例如:

const eventSource = new EventSource('/random');
eventSource.onmessage = function(event) {
    console.log(event.data); // 处理接收到的随机数
};

EventSource 会自动处理重连,适合需要持续更新的场景。

6. 高级实现与优化

  • 使用框架:Gin 框架提供 SSEvent 方法,简化 SSE 实现。例如:

    c.SSEvent("message", "Hello, Event Stream!")

  • 使用库:github.com/r3labs/sse 提供更高级功能,如事件 ID 和重播支持。

  • 并发处理:对于高并发场景,可使用通道(channel)分发事件,多个 goroutine 处理不同客户端。
    以下表格总结了两种常见实现方式的对比:

    实现方式优点缺点
    标准库 net/http轻量,灵活,适合简单场景需要手动处理连接和刷新,代码复杂
    Gin 框架内置 SSE 支持,简洁,易于扩展依赖框架,增加依赖

7.demo

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net/http"
    "runtime/debug"
    "time"

    "github.com/spf13/cast"
)

func main() {
    defer recovery()

    http.HandleFunc("/chat/send", Send)
    fmt.Println("服务器启动在 http://localhost:8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

func Send(w http.ResponseWriter, r *http.Request) {
    // 处理预检请求
    if r.Method == "OPTIONS" {
        w.WriteHeader(http.StatusOK)
        return
    }

    body, err := io.ReadAll(r.Body)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    var params SendRequest
    err = json.Unmarshal(body, &params)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    demo := []string{
        "你好",
        "你是谁",
        "你是做什么的",
        "你是怎么工作的",
        "你是在哪座城市",
        "你是什么星座",
        "你是哪个国家的",
        "你是哪个省的",
        "你是哪个市的",
        "你是哪个区的",
        "你是哪个街道的",
        "你是哪个社区的",
        "你是哪个村的",
    }

    flusher, ok := w.(http.Flusher) // 获取流式输出器
    if !ok {
        http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
        return
    }

    //设置header
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    //流式输出
    for _, v := range demo {
        time.Sleep(1 * time.Second)
        lineData := fmt.Sprintf("data: %s\n\n", v)
        io.WriteString(w, lineData)
        flusher.Flush()
    }
}

type SendRequest struct {
    Msg string `json:"msg"`
}

func recovery() {
    if rec := recover(); rec != nil {
        log.Printf("Panic Panic occur")
        if err, ok := rec.(error); ok {
            log.Printf("PanicRecover Unhandled error: %v\n stack:%v", err.Error(), cast.ToString(debug.Stack()))
        } else {
            log.Printf("PanicRecover Panic: %v\n stack:%v", rec, cast.ToString(debug.Stack()))
        }
    }
}

enter image description here

执行一下命令运行:

go mod init

go mod tidy

go run main.go

enter image description here

8.Go转发大模型流式输出demo

sendRequest.Model = "qwen-max"
    streamResp := &proto.StreamResp{}
    qwenClient := service.NewQwen(sendRequest)
    qwenClient.QwenStream(streamResp)

    defer streamResp.HttpResp.Body.Close()

    // 1. 复制下游服务的响应头
    for key, values := range streamResp.HttpResp.Header {
        for _, value := range values {
            w.Header().Add(key, value)
        }
    }

    // 2. 复制下游服务的状态码
    w.WriteHeader(streamResp.HttpResp.StatusCode)

    //流式输出
    // 确保 ResponseWriter 支持 Flusher
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
        return
    }
    // 处理流式响应
    scanner := bufio.NewScanner(streamResp.HttpResp.Body)
    for scanner.Scan() {
        lineData := scanner.Text()
        // 将响应数据逐步发送给客户端
        io.WriteString(w, lineData+"\n\n")
        flusher.Flush() // 刷新缓冲区
    }

enter image description here

 

结论

在 Go 中实现 Content-Type: text/event-stream 流式输出需设置正确头部、格式化事件数据并保持连接开放。标准库和框架各有优势,开发者可根据需求选择。

推荐参考以下资源深入学习:

  • Streaming Server-Sent Events With Go(https://pascalallen.medium.com/streaming-server-sent-events-with-go-8cc1f615d561)

  • Server-Sent Events (SSE) in Golang([Server-Sent Events (SSE) in Golang](https://www.kelche.co/blog/go/server-sent-events/))

  • Using server-sent events - MDN Web Docs(https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events

作者:admin  创建时间:2025-07-23 15:12
最后编辑:admin  更新时间:2025-11-16 12:43