hjjjj 5730b2a798
Some checks failed
CI / Unit tests (push) Has been cancelled
CI / commit_lint (push) Has been cancelled
feat: 支持图片编辑功能并优化多模态计费
refactor: 重构 Gemini 适配器以支持图片编辑和生成

feat(relay): 添加图片编辑模式支持

feat(controller): 实现 UsageAPIURL 用于获取真实 token 用量

feat(web): 在渠道测试中添加模型选择功能

perf(token): 优化多模态 token 计算逻辑

fix(web): 修复日志分页组件显示问题

docs: 更新渠道配置中的 UsageAPIURL 说明

style: 清理调试日志和注释

feat(gemini): 支持 Imagen 3+ 图片生成模型

feat(openai): 添加生成 ID 捕获和元数据获取功能
2026-03-17 18:28:54 +08:00

160 lines
5.1 KiB
Go

package openai
import (
"bufio"
"bytes"
"encoding/json"
"io"
"net/http"
"strings"
"github.com/songquanpeng/one-api/common/render"
"github.com/gin-gonic/gin"
"github.com/songquanpeng/one-api/common"
"github.com/songquanpeng/one-api/common/conv"
"github.com/songquanpeng/one-api/common/logger"
"github.com/songquanpeng/one-api/relay/model"
"github.com/songquanpeng/one-api/relay/relaymode"
)
const (
dataPrefix = "data: "
done = "[DONE]"
dataPrefixLength = len(dataPrefix)
)
// StreamHandler processes an OpenAI-compatible SSE stream and returns
// (error, responseText, usage, generationId).
// generationId is the "id" field captured from the first stream chunk; it can
// be used by callers that need to fetch per-request metadata from an upstream API.
func StreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*model.ErrorWithStatusCode, string, *model.Usage, string) {
responseText := ""
scanner := bufio.NewScanner(resp.Body)
scanner.Split(bufio.ScanLines)
var usage *model.Usage
var generationId string
common.SetEventStreamHeaders(c)
doneRendered := false
for scanner.Scan() {
data := scanner.Text()
if len(data) < dataPrefixLength { // ignore blank line or wrong format
continue
}
if data[:dataPrefixLength] != dataPrefix && data[:dataPrefixLength] != done {
continue
}
if strings.HasPrefix(data[dataPrefixLength:], done) {
render.StringData(c, data)
doneRendered = true
continue
}
switch relayMode {
case relaymode.ChatCompletions:
var streamResponse ChatCompletionsStreamResponse
err := json.Unmarshal([]byte(data[dataPrefixLength:]), &streamResponse)
if err != nil {
logger.SysError("error unmarshalling stream response: " + err.Error())
render.StringData(c, data) // if error happened, pass the data to client
continue // just ignore the error
}
if len(streamResponse.Choices) == 0 && streamResponse.Usage == nil {
// but for empty choice and no usage, we should not pass it to client, this is for azure
continue // just ignore empty choice
}
render.StringData(c, data)
for _, choice := range streamResponse.Choices {
responseText += conv.AsString(choice.Delta.Content)
}
if generationId == "" && streamResponse.Id != "" {
generationId = streamResponse.Id
}
if streamResponse.Usage != nil {
usage = streamResponse.Usage
}
case relaymode.Completions:
render.StringData(c, data)
var streamResponse CompletionsStreamResponse
err := json.Unmarshal([]byte(data[dataPrefixLength:]), &streamResponse)
if err != nil {
logger.SysError("error unmarshalling stream response: " + err.Error())
continue
}
for _, choice := range streamResponse.Choices {
responseText += choice.Text
}
}
}
if err := scanner.Err(); err != nil {
logger.SysError("error reading stream: " + err.Error())
}
if !doneRendered {
render.Done(c)
}
err := resp.Body.Close()
if err != nil {
return ErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), "", nil, ""
}
return nil, responseText, usage, generationId
}
func Handler(c *gin.Context, resp *http.Response, promptTokens int, modelName string) (*model.ErrorWithStatusCode, *model.Usage) {
var textResponse SlimTextResponse
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return ErrorWrapper(err, "read_response_body_failed", http.StatusInternalServerError), nil
}
err = resp.Body.Close()
if err != nil {
return ErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil
}
err = json.Unmarshal(responseBody, &textResponse)
if err != nil {
return ErrorWrapper(err, "unmarshal_response_body_failed", http.StatusInternalServerError), nil
}
if textResponse.Error.Type != "" {
return &model.ErrorWithStatusCode{
Error: textResponse.Error,
StatusCode: resp.StatusCode,
}, nil
}
// Reset response body
resp.Body = io.NopCloser(bytes.NewBuffer(responseBody))
// We shouldn't set the header before we parse the response body, because the parse part may fail.
// And then we will have to send an error response, but in this case, the header has already been set.
// So the HTTPClient will be confused by the response.
// For example, Postman will report error, and we cannot check the response at all.
for k, v := range resp.Header {
c.Writer.Header().Set(k, v[0])
}
c.Writer.WriteHeader(resp.StatusCode)
_, err = io.Copy(c.Writer, resp.Body)
if err != nil {
return ErrorWrapper(err, "copy_response_body_failed", http.StatusInternalServerError), nil
}
err = resp.Body.Close()
if err != nil {
return ErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil
}
if textResponse.Usage.TotalTokens == 0 || (textResponse.Usage.PromptTokens == 0 && textResponse.Usage.CompletionTokens == 0) {
completionTokens := 0
for _, choice := range textResponse.Choices {
completionTokens += CountTokenText(choice.Message.StringContent(), modelName)
}
textResponse.Usage = model.Usage{
PromptTokens: promptTokens,
CompletionTokens: completionTokens,
TotalTokens: promptTokens + completionTokens,
}
}
return nil, &textResponse.Usage
}