Eino使用指南
官方文档:https://www.cloudwego.io/zh/docs/eino/overview/eino_open_source/
0、前言
[yach-doc-shimo.zhiyi... 《服务端-实现一个简单的AI Agent 对话机器人》](yach-doc-shimo.zhiyi... 《服务端-实现一个简单的AI Agent 对话机器人》)
[yach-doc-shimo.zhiyi... 《玩转后端go MCP项目开发》](yach-doc-shimo.zhiyi... 《玩转后端go MCP项目开发》)
1、实现一个最简 LLM 应用ß
创建一个main.go文件
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino/components/prompt"
"github.com/cloudwego/eino/schema"
)
1.1. 创建模板
func GetTmpMsg() []*schema.Message {
// 创建模板,使用 FString 格式
template := prompt.FromMessages(schema.FString,
// 系统消息模板
schema.SystemMessage("你是一个{role}。你需要用{style}的语气回答问题。你的目标是帮助程序员保持积极乐观的心态,提供技术建议的同时也要关注他们的心理健康。"),
// 插入需要的对话历史(新对话的话这里不填)
schema.MessagesPlaceholder("chat_history", true),
// 用户消息模板
schema.UserMessage("问题: {question}"),
)
// 使用模板生成消息
messages, err := template.Format(context.Background(), map[string]any{
"role": "程序员鼓励师",
"style": "积极、温暖且专业",
"question": "我的代码一直报错,感觉好沮丧,该怎么办?",
// 对话历史(这个例子里模拟两轮对话历史)
"chat_history": []*schema.Message{
schema.UserMessage("你好"),
schema.AssistantMessage("嘿!我是你的程序员鼓励师!记住,每个优秀的程序员都是从 Debug 中成长起来的。有什么我可以帮你的吗?", nil),
schema.UserMessage("我觉得自己写的代码太烂了"),
schema.AssistantMessage("每个程序员都经历过这个阶段!重要的是你在不断学习和进步。让我们一起看看代码,我相信通过重构和优化,它会变得更好。记住,Rome wasn't built in a day,代码质量是通过持续改进来提升的。", nil),
},
})
if err != nil {
log.Fatal(err)
}
return messages
}
1.2. 构建ChatModel
func BuildChatModel(messages []*schema.Message) (*openai.ChatModel, error) {
customTransport := &CustomTransport{
APIKey: "1000081625:82b5fb8befca19337bde03feb74a8a9b", // 设置API-KEY值
}
httpClient := &http.Client{
Transport: customTransport,
}
// http://ai-service-test.tal.com/openai-compatible/v1/chat/completions
chatModel, err := openai.NewChatModel(context.Background(), &openai.ChatModelConfig{
Model: "gpt-4o", // 使用的模型版本
APIKey: "xxxxxxx:xxxxxxxxx", // OpenAI API 密钥
BaseURL: "http://xxxxxxx/openai-compatible/v1", // OpenAI API 地址
HTTPClient: httpClient, // 使用自定义HTTPClient
})
return chatModel, err
}
// CustomTransport 添加自定义请求头的Transport
type CustomTransport struct {
Transport http.RoundTripper
APIKey string
}
func (t *CustomTransport) RoundTrip(req *http.Request) (*http.Response, error) {
// 添加API-KEY请求头
req.Header.Set("API-KEY", t.APIKey)
// 调试信息:打印请求详情
fmt.Printf("=== HTTP请求调试信息 ===\n")
fmt.Printf("请求方法: %s\n", req.Method)
fmt.Printf("请求URL: %s\n", req.URL.String())
fmt.Printf("请求头:\n")
for key, values := range req.Header {
for _, value := range values {
fmt.Printf(" %s: %s\n", key, value)
}
}
// 如果没有设置Transport,使用默认的
if t.Transport == nil {
t.Transport = http.DefaultTransport
}
return t.Transport.RoundTrip(req)
}
1.3. 发起请求
func main() {
//1.获取完整的模板消息
messages := GetTmpMsg()
output, _ := json.MarshalIndent(messages, "", " ")
fmt.Println("-------------------------------1111---------------------------------start")
fmt.Println(string(output))
fmt.Println("-------------------------------1111---------------------------------end")
//2.构建ChatModel
chatModel, err := BuildChatModel(messages)
if err != nil {
log.Fatal(err)
return
}
response, err := chatModel.Generate(context.Background(), messages)
if err != nil {
log.Fatal(err)
}
fmt.Println("-------------------------------22222---------------------------------start")
fmt.Printf("AI回复: %s\n", response.Content)
fmt.Println("-------------------------------22222---------------------------------end")
}
go mod init
go mod tidy
go run main.go
2、构建Tool
在 Eino 中,要实现 Agent 主要需要两个核心部分:ChatModel 和 Tool
2.1.NewTool
import (
"context"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/components/tool/utils"
"github.com/cloudwego/eino/schema"
)
// 处理函数
func AddTodoFunc(_ context.Context, params *TodoAddParams) (string, error) {
// Mock处理逻辑
return `{"msg": "add todo success"}`, nil
}
func GetAddTodoTool() tool.InvokableTool {
// 工具信息
info := &schema.ToolInfo{
Name: "add_todo",
Desc: "Add a todo item",
ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
"content": {
Desc: "The content of the todo item",
Type: schema.String,
Required: true,
},
"started_at": {
Desc: "The started time of the todo item, in unix timestamp",
Type: schema.Integer,
},
"deadline": {
Desc: "The deadline of the todo item, in unix timestamp",
Type: schema.Integer,
},
}),
}
// 使用NewTool创建工具
return utils.NewTool(info, AddTodoFunc)
}
2.2. InferTool
这种方式更加简洁,通过结构体的 tag 来定义参数信息,就能实现参数结构体和描述信息同源,无需维护两份信息
// 使用 InferTool 创建工具
updateTool, err := utils.InferTool(
"update_todo", // tool name
"Update a todo item, eg: content,deadline...", // tool description
localTool.UpdateTodoFunc)
// 参数结构体
type TodoUpdateParams struct {
ID string `json:"id" jsonschema:"description=id of the todo"`
Content *string `json:"content,omitempty" jsonschema:"description=content of the todo"`
StartedAt *int64 `json:"started_at,omitempty" jsonschema:"description=start time in unix timestamp"`
Deadline *int64 `json:"deadline,omitempty" jsonschema:"description=deadline of the todo in unix timestamp"`
Done *bool `json:"done,omitempty" jsonschema:"description=done status"`
}
// 处理函数
func UpdateTodoFunc(_ context.Context, params *TodoUpdateParams) (string, error) {
// Mock处理逻辑
return `{"msg": "update todo success"}`, nil
}
2.3. 实现 Tool 接口
2.4. 官方工具
Bing Search Tool : Bing 搜索工具
BrowserUse Tool :浏览器使用工具
CommandLine Tool :命令行工具
Googlesearch Tool : 谷歌搜索工具
HttpRequest Tool :Http 请求工具
Sequential Thinking Tool : 序列思维工具
Wikipedia Tool :维基百科工具
DuckDuckGoSearch : DuckDuckGo搜搜工具
MCP Tool : 可以直接访问已有 MCP Server 上的资
此处贴上之前总结的用go实现MCP文档
3、Components 组件
3.1. Document Loader
用于加载文档的组件,它的主要作用是从不同来源(如网络 URL、本地文件等)加载文档内容,并将其转换为标准的文档格式。这个组件在处理需要从各种来源获取文档内容的场景中发挥重要作用,比如:
从网络 URL 加载网页内容
读取本地 PDF、Word 等格式的文档
3.1.1 单独使用
load方法说明:
功能:从指定的数据源加载文档
参数:
ctx:上下文对象,用于传递请求级别的信息,同时也用于传递 Callback Manager
src:文档来源,包含文档的 URI 信息
opts:加载选项,用于配置加载行为
返回值:
[]*schema.Document:加载的文档列表error:加载过程中的错误信息
import (
"github.com/cloudwego/eino/components/document"
"github.com/cloudwego/eino-ext/components/document/loader/file"
)
//加载文档
// 初始化 loader (以file loader为例)
loader, _ := file.NewFileLoader(ctx, &file.FileLoaderConfig{
// 配置参数
UseNameAsID: true,
})
// 加载文档
filePath := "./doc/study.md"
docs, _ := loader.Load(ctx, document.Source{
URI: filePath,
})
Document结构体
ID:文档的唯一标识符,用于在系统中唯一标识一个文档
Content:文档的实际内容
MetaData:文档的元数据,可以存储如下信息:
文档的来源信息
文档的向量表示(用于向量检索)
文档的分数(用于排序)
文档的子索引(用于分层检索)
其他自定义元数据
type Document struct {
// ID 是文档的唯一标识符
ID string
// Content 是文档的内容
Content string
// MetaData 用于存储文档的元数据信息
MetaData map[string]any
}
3.1.2 在编排中使用
// 在 Chain 中使用
chain := compose.NewChain[document.Source, []*schema.Document]()
chain.AppendLoader(loader)
// 编译并运行
runnable, _ := chain.Compile()
result, _ := runnable.Invoke(ctx, input)
// 在 Graph 中使用
graph := compose.NewGraph[string, []*schema.Document]()
graph.AddLoaderNode("loader_node", loader)
3.1.3 官方Document组件
Loader - amazon s3 : 用于从 AWS S3 存储桶中加载文档内容
Loader - local file : 用于从本地文件系统中加载文档内容
Loader - web url : 用于从网络 URL 中加载文档内容
Parser - html : 用于将 HTML 网页内容解析为纯文本
Parser - pdf : 用于将 PDF 文件内容解析为纯文本
Splitter - recursive : 用于将长文档按照指定大小递归地切分成更小的片段
Splitter - semantic : 用于基于语义相似度将长文档切分成更小的片段
Splitter - markdown : 用于根据 Markdown 文档的标题层级结构进行分割
3.1.4 Document Parser
用于解析文档内容的工具包
3.2. Embedding
用于将文本转换为向量表示的组件。它的主要作用是将文本内容映射到向量空间,使得语义相似的文本在向量空间中的距离较近。这个组件在以下场景中发挥重要作用:
文本相似度计算
语义搜索
文本聚类分析
3.3. Retriever
用于从各种数据源检索文档的组件。它的主要作用是根据用户的查询(query)从文档库中检索出最相关的文档。这个组件在以下场景中特别有用:
基于向量相似度的文档检索
基于关键词的文档搜索
知识库问答系统 (rag)
3.4. Document Transformer
Document Transformer 是一个用于文档转换和处理的组件。它的主要作用是对输入的文档进行各种转换操作,如分割、过滤、合并等,从而得到满足特定需求的文档。这个组件可用于以下场景中:
将长文档分割成小段落以便于处理
根据特定规则过滤文档内容
对文档内容进行结构化转换
提取文档中的特定部分
3.5. Lambda
它允许用户在工作流中嵌入自定义的函数逻辑。Lambda 组件底层是由输入输出是否流所形成的 4 种运行函数组成,对应 4 种交互模式: Invoke、Stream、Collect、Transform。
4、 编排功能
据场景化的业务逻辑,对各种能力进行组合、串联,大模型应用的开发有其自身典型的特征: 自定义的业务逻辑本身不会很复杂,几乎主要都是对『原子能力』的组合串联。
4.1 Graph
4.1.1创建 Graph 实例
const (
nodeKeyOfTemplate = "template"
nodeKeyOfChatModel = "chat_model"
nodeKeyOfTools = "tools"
)
// 6. 创建一个 Graph 实例
// 输入:map[string]any - 用于ChatTemplate的变量替换
// 输出:[]*schema.Message - 最终的消息列表
g := compose.NewGraph[map[string]any, []*schema.Message]()
4.1.2 添加节点到Graph
// 7. 添加ChatTemplate节点到Graph
_ = g.AddChatTemplateNode(nodeKeyOfTemplate, chatTpl)
// 8. 添加ChatModel节点到Graph
_ = g.AddChatModelNode(nodeKeyOfChatModel, chatModel)
// 9. 添加ToolsNode节点到Graph
_ = g.AddToolsNode(nodeKeyOfTools, toolsNode)
添加三个处理节点:
template: 处理消息模板和变量替换
chat_model: AI模型进行推理
tools: 执行工具调
4.1.2.1 输入类型解释(变量替换过程)
变量的替换输入 : map[string]any
out, err := r.Invoke(ctx, map[string]any{
"message_histories": []*schema.Message{},
"user_query": "我叫 zhangsan, 邮箱是 zhangsan@bytedance.com, 帮我推荐一处房产",
})
替换过程详解
// Format formats the chat template with the given context and variables.
func (t *DefaultChatTemplate) Format(ctx context.Context,
vs map[string]any, _ ...Option) (result []*schema.Message, err error) {
result = make([]*schema.Message, 0, len(t.templates))
for _, template := range t.templates {
msgs, err := template.Format(ctx, vs, t.formatType)
if err != nil {
return nil, err
}
result = append(result, msgs...)
}
return result, nil
}
接收 map[string]any 变量映射
遍历每个模板组件
调用每个组件的 Format 方法进行变量替换
返回完整的消息列表
替换后的消息
[
{
"role": "system",
"content": "你是一名房产经纪人,结合用户的薪酬和工作,使用 user_info API,为其提供相关的房产信息。邮箱是必须的"
},
{
"role": "user",
"content": "我叫 zhangsan, 邮箱是 zhangsan@bytedance.com, 帮我推荐一处房产"
}
]
4.1.2.2 输出类型解释(工具节点输入输出过程)
输入:AssistantMessage with ToolCalls
{
"role": "assistant",
"content": null,
"tool_calls": [
{
"id": "call_123",
"function": {
"name": "user_info",
"arguments": "{\"name\":\"zhangsan\",\"email\":\"zhangsan@bytedance.com\"}"
}
}
]
}
ToolsNode 处理过程
ret := make([]*schema.Message, n)
ret[index] = schema.ToolMessage(s, callID, schema.WithToolName(callName))
return ret, nil
处理步骤:
解析AssistantMessage中的ToolCalls
并行或顺序执行每个工具调用
收集工具执行结果
构造ToolMessage返回
输出:[]*schema.Message (ToolMessage数组)
// ToolMessage represents a message with Role "tool".
func ToolMessage(content string, toolCallID string, opts ...ToolMessageOption) *Message {
o := &toolMessageOptions{}
for _, opt := range opts {
opt(o)
}
return &Message{
Role: Tool,
Content: content,
ToolCallID: toolCallID,
ToolName: o.toolName,
}
}
实际输出示例,执行了一个 user_info 工具调用,输出将是:
[
{
"role": "tool",
"content": "{\"name\":\"zhangsan\",\"email\":\"zhangsan@bytedance.com\",\"company\":\"Facebook\",\"position\":\"CTO\",\"salary\":\"10000000\"}",
"tool_call_id": "call_123",
"tool_name": "user_info"
}
]
4.1.3构建节点连接关系
// 10. add connection between nodes
_ = g.AddEdge(compose.START, nodeKeyOfTemplate)
_ = g.AddEdge(nodeKeyOfTemplate, nodeKeyOfChatModel)
_ = g.AddEdge(nodeKeyOfChatModel, nodeKeyOfTools)
_ = g.AddEdge(nodeKeyOfTools, compose.END)
构建节点连接关系 : START → template → chat_model → tools → END
数据流动过程
START: 接收 map[string]any 输入
template: 将变量替换到消息模板中,输出 []*schema.Message
chat_model: AI 处理消息,可能返回工具调用
tools: 执行工具调用,返回结果消息
END: 输出最终的 []*schema.Message
4.1.4编译和执行
// 9. compile Graph[I, O] to Runnable[I, O]
r, err := g.Compile(ctx)
if err != nil {
logs.Errorf("Compile failed, err=%v", err)
return
}
4.2 Chain
可以理解它就是Graph的简化,本质是一样的逻辑
4.3 编排的设计理念
基本假设:前一个运行节点的输出值,可以作为下一个节点的输入值。在 Go 中,要实现这个假设,有两个基本方案:
方案1:把不同节点的输入输出都变成一种更泛化的类型,例如 any 、map[string]any 等。
采用泛化成 any 的方案,但对应的代价是: 开发者在写代码时,需要显式转换成具体类型才能使用。这会极大增加开发者的心智负担,因此最终放弃此方案。
langchain 的方案可以看做是全程传递
map[string]any,各个逻辑节点根据自己的需要,用对应的 key 去取对应的 value。在 langchaingo 的实现中,即是按照这种方式实现,但同样,golang 中的 any 要被使用依然要使用类型断言才可使用。这种方案在开发者使用时依然有很大的心智负担。
方案2:每一个节点的输入输出类型保持开发者的预期,在 Compile 阶段保证上下游的类型是一致的。
方案 2 即是 eino 最终选定的方案。
graph中的类型对齐
chain中的类型对齐
func TestChain() {
chain := compose.NewChain[map[string]interface,string]()
nodeTemplate := &fakeChatTemplate{} // input: map[string]any, output: []*schema.Message
nodeHistoryLambda := &fakeLambda{} // input: []*schema.Message, output: []*schema.Message
nodeChatModel := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
nodeConvertResLambda := &fakeLambda{} // input: *schema.Message, output: string
chain.
AppendChatTemplate(nodeTemplate).
AppendLambda(nodeHistoryLambda).
AppendChatModel(nodeChatModel).
AppendLambda(nodeConvertResLambda)
}
parallel
parallel 在 chain 中是一类特殊的节点,从 chain 的角度看 parallel 和其他的节点没啥区别。在 parallel 内部,其基本拓扑结构如下:
func TestParallel() {
chain := compose.NewChain[map[string]any, map[string]*schema.Message]()
parallel := compose.NewParallel()
model01 := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
model02 := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
model03 := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
parallel.
AddChatModel("outkey_01", model01).
AddChatModel("outkey_02", model02).
AddChatModel("outkey_03", model03)
lambdaNode := &fakeLambdaNode{} // input: map[string]any, output: map[string]*schema.Message
chain.
AppendParallel(parallel).
AppendLambda(lambdaNode)
}
一个 parallel 在 chain 中的视角如下:
5、 Flow 集成
5.1 React Agent
react agent 底层使用 compose.Graph 作为编排方案,一般来说有 2 个节点: ChatModel、Tools,中间运行过程中的所有历史消息都会放入 state 中,在将所有历史消息传递给 ChatModel 之前,会 copy 消息交由 MessageModifier 进行处理,处理的结果再传递给 ChatModel。直到 ChatModel 返回的消息中不再有 tool call,则返回最终消息,当 Tools 列表中至少有一个 Tool 配置了 ReturnDirectly 时,ReAct Agent 结构会更复杂:在 ToolsNode 之后会增加一个 Branch,判断是否调用了一个 ReturnDirectly 的 Tool,如果是,直接 END,否则照旧进入 ChatModel。
5.1.1 初始化
提供了 ReactAgent 初始化函数,必填参数为 Model 和 ToolsConfig,选填参数为 MessageModifier, MaxStep, ToolReturnDirectly 和 StreamToolCallChecker.
5.1.2 Model
model 接收一个 ChatModel,在 agent 内部,会调用 BindTools 接口,定义为:
type ChatModel interface {
Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
Stream(ctx context.Context, input []*schema.Message, opts ...Option) (
*schema.StreamReader[*schema.Message], error)
BindTools(tools []*schema.ToolInfo) error
}
5.1.3 MessageModifier
MessageModifier 会在每次把所有历史消息传递给 ChatModel 之前执行,定义为:
agent, err := react.NewAgent(ctx, &react.AgentConfig{
ToolCallingModel: toolableChatModel,
ToolsConfig: tools,
MessageModifier: func(ctx context.Context, input []*schema.Message) []*schema.Message {
res := make([]*schema.Message, 0, len(input)+1)
res = append(res, schema.SystemMessage("你是一个 Go 开发专家."))
res = append(res, input...)
return res
},
})
agent.Generate(ctx, []*schema.Message{schema.UserMessage("写一个 hello world 的代码")})
// 模型得到的实际输入为:
// []*schema.Message{
// {Role: schema.System, Content: "You are an expert Go developer."},
// {Role: schema.Human, Content: "Write a hello world code"}
//}
5.1.4 MaxStep
指定 Agent 最大运行步长,每次从一个节点转移到下一个节点为一步,默认值为 node 个数 + 2。
由于 Agent 中一次循环为 ChatModel + Tools,即为 2 步,因此默认值 12 最多可运行 6 个循环。但由于最后一步必须为 ChatModel 返回 (因为 ChatModel 结束后判断无须运行 tool 才能返回最终结果),因此最多运行 5 次 tool。
同理,若希望最多可运行 10 个循环 (10 次 ChatModel + 9 次 Tools),则需要设置 MaxStep 为 20。若希望最多运行 20 个循环,则 MaxStep 需为 40。
func main() {
agent, err := react.NewAgent(ctx, &react.AgentConfig{
ToolCallingModel: toolableChatModel,
ToolsConfig: tools,
MaxStep: 20,
}
}
5.1.5 ToolReturnDirectly
如果希望当 ChatModel 选择了特定的 Tool 并执行后,Agent 直接把 Tool 的 Response ToolMessage 返回去,则可以在 ToolReturnDirectly 中配置这个 Tool。
a, err = NewAgent(ctx, &AgentConfig{
ToolCallingModel: cm,
ToolsConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{fakeTool, fakeStreamTool},
},
MaxStep: 40,
ToolReturnDirectly: map[string]struct{}{fakeToolName: {}}, // one of the two tools is return directly
})
5.1.5 StreamToolCallChecker
不同的模型在流式模式下输出工具调用的方式可能不同: 某些模型(如 OpenAI) 会直接输出工具调用;某些模型 (如 Claude) 会先输出文本,然后再输出工具调用。因此需要使用不同的方法来判断,这个字段用来指定判断模型流式输出中是否包含工具调用的函数。
func firstChunkStreamToolCallChecker(_ context.Context, sr *schema.StreamReader[*schema.Message]) (bool, error) {
defer sr.Close()
for {
msg, err := sr.Recv()
if err == io.EOF {
return false, nil
}
if err != nil {
return false, err
}
if len(msg.ToolCalls) > 0 {
return true, nil
}
if len(msg.Content) == 0 { // skip empty chunks at the front
continue
}
return false, nil
}
}
上述默认实现适用于:模型输出的 Tool Call Message 中只有 Tool Call。¡
默认实现不适用的情况:在输出 Tool Call 前,有非空的 content chunk。此时,需要自定义 tool Call checker 如下:
toolCallChecker := func(ctx context.Context, sr *schema.StreamReader[*schema.Message]) (bool, error) {
defer sr.Close()
for {
msg, err := sr.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// finish
break
}
return false, err
}
if len(msg.ToolCalls) > 0 {
return true, nil
}
}
return false, nil
}
尝试添加 prompt 来约束模型在工具调用时不额外输出文本,例如:“如果需要调用tool,直接输出tool,不要输出文本”。
不同模型受 prompt 影响可能不同,实际使用时需要自行调整prompt并验证效果。
5.1.6 Generate
agent, _ := react.NewAgent(...)
var outMessage *schema.Message
outMessage, err = agent.Generate(ctx, []*schema.Message{
schema.UserMessage("写一个 Go 的 hello world 程序"),
})
5.1.7 Stream
agent, _ := react.NewAgent(...)
var msgReader *schema.StreamReader[*schema.Message]
msgReader, err = agent.Stream(ctx, []*schema.Message{
schema.UserMessage("写一个 Go 的 hello world 程序"),
})
for {
// msg type is *schema.Message
msg, err := msgReader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// finish
break
}
// error
log.Printf("failed to recv: %v\n", err)
return
}
fmt.Print(msg.Content)
}
5.2 Host Multi Agent
最后编辑:admin 更新时间:2025-11-10 16:48